数据开发常用
生长中
1 个月前
1 个月前
知识图谱

Spark 作业的基本运行原理

如图中所示,提交作业后会启动一个 Driver 进程(本地或集群节点上,取决于 deploy-mode),Driver 首先去资源管理器(如 YARN)要资源,让集群在多台机器上启动多个 Executor 进程。每个 Executor 都有自己的内存和 CPU 核心。然后Driver 把你的代码拆成多个阶段(stage),再把每个阶段切成很多最小计算单元(task),分发到各个 Executor 去并行执行。每个 task 都跑同一段逻辑,但处理的数据分片不同。一个阶段的所有 task 完成后,会把中间结果写到各节点本地磁盘;然后Driver就会调度运行下一个stage。Stage 的划分是根据 Shuffle 类型的算子来进行划分的,Shuffle 类型的算子包括 groupByKeyreduceByKeyjoincogroup 等。下一个stage的task的输入数据就是上一个stage输出的中间结果。如此循环往复,直到将我们自己编写的代码逻辑全部执行完,并且计算完所有的数据,得到我们想要的结果为止。

常用函数

时间函数

时间函数是在开发中最常用的函数之一了,因为很多的分区都使用的是时间分区,以及时间有各种各样的格式以及需要进行各种计算,所以熟练的掌握时间的处理可以极大的提高我们的开发效率。

to_date:返回日期字符串的日期值。只针对特定格式的日期字符串,例如 yyyy-MM-dd 不支持 yyyyMMdd

SELECT to_date('2025-11-26 10:32:00') AS date;
-- 输出:2025-11-26
SELECT to_date('20251126') AS date;
-- 输出 NULL

from_unixtime:转化当前时间戳为日期字符串。

第一个参数为10位的时间戳,如果是13位必须cast(13位时间戳/1000 as bigint) 转换后再进行实践转换

SELECT from_unixtime(1700998320) AS date;
-- 输出:2025-11-26 10:32:00
SELECT from_unixtime(1700998320000,'yyyy-MM-dd HH:mm:ss') AS date;
-- 输出:2025-11-26 10:32:00

unix_timestamp:返回日期字符串的时间戳。

SELECT unix_timestamp('2025-11-26 10:32:00') AS timestamp;
-- 输出:1700998320
SELECT unix_timestamp('20251126','yyyyMMdd');
-- 输出:1764115200

条件函数

if: 三目运算符,根据条件返回不同的值。

SELECT if(1=1,'true','false') AS result;
-- 输出:true
SELECT if(1=2,'true','false') AS result;
-- 输出:false

case when: 多分支条件判断,根据不同的条件返回不同的值。

SELECT case when 1=1 then 'true' when 1=2 then 'false' else 'unknown' end AS result;
-- 输出:true
SELECT case when 1=2 then 'true' else 'false' end AS result;
-- 输出:false

case when 语法是有短路求值的,如果满足第一个when 判断条件,就不会继续往下判断,这点需要注意,如果满足多个when条件,优先取的值when要放在前面。

coalesce:返回第一个非空值。如果所有参数都是空值,返回空值。

SELECT coalesce(null,'a','b','c') AS result;
-- 输出:a
SELECT coalesce(null,null,null) AS result;
-- 输出:null

nvl:如果第一个参数为null,返回第二个参数;否则返回第一个参数。

SELECT nvl(null,'a') AS result;
-- 输出:a
SELECT nvl('b','a') AS result;
-- 输出:b

数值计算函数

rand:返回一个0到1之间的随机数,也可以用于随机取样。

SELECT rand() AS random_number;
-- 输出:0.12345678901234567890

例如我们想获取随机的1000 条数据

SELECT * FROM table_name
order by rand()
limit 1000

round:四舍五入到指定的小数位数。

SELECT round(1.2345,2) AS result;
-- 输出:1.23

ceil:向上取整,返回大于或等于该值的最小整数。

SELECT ceil(1.2345) AS result;
-- 输出:2

floor:向下取整,返回小于或等于该值的最大整数。

SELECT floor(1.2345) AS result;
-- 输出:1

窗口函数

窗口函数是在 SQL 中用于对结果集进行分组计算的函数。它可以在不改变结果行数的情况下,为每一行添加额外的计算结果。窗口函数通常与 OVER 子句一起使用,用于指定窗口的范围和分区。

相对位置函数

lead: 返回分组内当前行后面的第n行的值。如果当前行后面没有足够的行,则返回 NULL。

SELECT lead(salary,1) OVER (PARTITION BY dept ORDER BY uid) AS next_salary;

lag: 返回分组内当前行前面的第n行的值。如果当前行前面没有足够的行,则返回 NULL。

SELECT lag(salary,1) OVER (PARTITION BY dept ORDER BY uid) AS prev_salary;

leadlag 函数通常可以用于去计算波峰波谷,例如常见的面试题如下:找到价格波动的波峰和波谷。

item_idtime_pointprice备注
item_0012023-12-01 10:00100初始价格
item_0012023-12-01 10:01120上升
item_0012023-12-01 10:02150波峰 (前值120,后值130)
item_0012023-12-01 10:03130下降
item_0012023-12-01 10:04110下降
item_0012023-12-01 10:0580波谷 (前值110,后值90)
item_0012023-12-01 10:0690上升
item_0012023-12-01 10:0790平稳
item_0012023-12-01 10:08160波峰 (前值90,后值100)
item_0012023-12-01 10:09100剧烈下降

我们可以用下面的方法来判断:

SELECT  *,
        CASE
            WHEN lead_time_price>price AND lag_time_price>price THEN '波谷'
            WHEN lead_time_price<price AND lag_time_price<price THEN '波峰'
        END price_type
FROM    (
            SELECT  *,
                    lead(price, 1) OVER (PARTITION BY item_id ORDER BY time_point ) AS lead_time_price,
                    lag(price, 1) OVER ( PARTITION BY item_id ORDER BY time_point ) AS lag_time_price
            FROM    t_price_history
        )

排名函数

first_value:返回分组内的第一个值。

SELECT first_value('a') OVER (PARTITION BY 1 ORDER BY 1) AS result;
-- 输出:a

last_value:返回分组内的最后一个值。

在 SQL 标准中,如果你没有显式指定窗口范围(ROWS/RANGE ...),LAST_VALUE 的默认窗口范围通常是:RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW(从分区的起始行到当前行)这意味着:默认情况下,LAST_VALUE 取的是 “当前行” 的值,而不是整个分区的最后一个值。 这往往不是用户想要的结果。所以,一般来说 不推荐使用last_value 可以使用first_value将数据倒序排序后来实现

deptuidsalary
A1100
A2200
A3300

对于这样的数据,例如我们想要获得A 部门的最后一个 uid 的数据,我们一般会使用如下的方法

LAST_VALUE(salary) OVER (PARTITION BY dept ORDER BY uid)

但是由于last_value 默认窗口是分区的起始到当前行,所以其计算过程如下:

计算过程:

  • 当处理 id=1 时,窗口范围是 id=1id=1。最后一个值是 100。
  • 当处理 id=2 时,窗口范围是 id=1id=2。最后一个值是 200。
  • 当处理 id=3 时,窗口范围是 id=1id=3。最后一个值是 300。
  • 结果:你得到的结果和 salary 列本身一模一样,而不是你期望的每一行都是 300。

所以我们可以通过三个方法来解决:

  1. 显式指定窗口范围:
LAST_VALUE(salary) OVER (PARTITION BY dept ORDER BY uid ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)
  1. 使用first_value 将数据倒序排序后取第一个值:
FIRST_VALUE(salary) OVER (PARTITION BY dept ORDER BY uid DESC)
  1. 使用 max 函数:
MAX(salary) OVER (PARTITION BY dept)

聚合函数

sum: 对分组内的数值进行求和。

with t(dept,uid,salary) as (
    select 'A', 1, 100 union all
    select 'A', 2, 200 union all
    select 'A', 3, 300 union all
    select 'B', 1, 400 union all
    select 'B', 2, 500
)

sum 的Order By 子句是可选的。如果不指定Order By 子句,那么sum 函数会对分组内的所有行进行求和。如果指定了Order By 子句,那么sum 函数会对分组内的行按照Order By 子句进行排序后再求和。所以排序后的求和是分组内的累计到当前行的和。

select 
    dept,
    sum(salary) over (partition by dept order by uid) as cum_salary
from t
+-------+-------------+
| dept  | cum_salary  |
+-------+-------------+
| A     | 100         |
| A     | 300         |
| A     | 600         |
| B     | 400         |
| B     | 900         |
+-------+-------------+
select 
    dept,
    sum(salary) over (partition by dept order by uid) as cum_salary
    sum(salary) over (partition by dept) as cum_salary
from t
+-------+-------------+
| dept  | cum_salary  |
+-------+-------------+
| A     | 600         |
| A     | 600         |
| A     | 600         |
| B     | 900         |
| B     | 900         |
+-------+-------------+

任务优化

数据倾斜

数据倾斜是指在Spark计算过程中,由于数据分布不均匀,导致某些节点处理的数据量远超其他节点,形成计算负载的 "长尾效应" 。这种现象会显著降低集群整体吞吐量,甚至引发OOM导致任务失败。

数据倾斜发生的现象: 绝大多数task执行得都非常快,但个别task执行极慢。比如,总共有1000个task,997个task都在1分钟之内执行完了,但是剩余两三个task却要一两个小时。这种情况很常见。具体的体现我们可以从Spark 的 web UI 或者metris 中查看。

  • 查看 metris 中的 CPU 和内存的使用情况,看看是否有 长尾效应
  • 查看 Spark 的 web UI 中的 Stage Tab,看看是否有Max 处理的数据量和时间明显偏大,如果有的话那就说着在这个 Stage 中发生了数据倾斜

数据倾斜发生的原理: 在进行shuffle的时候,必须将各个节点上相同的key拉取到某个节点上的一个task来进行处理,比如按照key进行聚合或join等操作。此时如果某个key对应的数据量特别大的话,就会发生数据倾斜。比如大部分key对应10条数据,但是个别key却对应了100万条数据,那么大部分task可能就只会分配到10条数据,然后1秒钟就运行完了;但是个别task可能分配到了100万数据,要运行一两个小时。因此,整个Spark作业的运行进度是由运行时间最长的那个task决定的。

背压

详见--> Flink 背压

注意事项

常见的数据注意事项

  1. 区分NULL、'' (空字符串)。NULL 是用 is null、is not null 来查询;''是用 ='' 、!=''来查询。
  2. 注意 where !=会将NULL值过滤掉
  3. and 优先级比 or 高 。用or的时候要注意用括号,来保障正确语义
  4. 字符串比较不同于整型比较,是从左到右顺序进行比较。 在同一位置出现某一个字符不同,即可区分两个字符串的大小,如果各位置相同,字符数多的大。

避免使用 distinct

Hive count(distinct xxx)只产生一个reduce,这就造成了所有map端传来的数据都在一个tasks中执行,成为了性能瓶颈。(简单来说就是无法发挥分布式计算的优势,本应多台机器完成的事情,只能一台机器去工作)。所以我们要先了解数据,判断是否必须使用 distinct,如果要使用distinct,也可以使用 group by 来进行代替。例如下面的案例:

使用 distinct

SELECT DISTINCT colA, colB, colC
FROM my_table;

使用 group by 代替 distinct

SELECT colA, colB, colC
FROM my_table
GROUP BY colA, colB, colC;

Flink 流式计算

两阶段提交

两阶段提交(Two-Phase Commit,简称 2PC) ,就是为了去完成我们常说的 " Exactly Once" 语义。即如何保证 Flink 内部的处理结果和外部系统(比如 Kafka、MySQL)的数据状态是完全一致的?。2PC 主要分为了两个阶段,分别如下:

  1. 预提交阶段(Pre-Commit Phase): JobManager 向所有 Source(数据源)发送一个令牌(Barrier)。这个令牌会随着数据流向下游传递,当数据流到 Sink 输出端的时候,Sink 会开启一个事务,将数据写入进去,但是此时属于不可见的状态,每一个算子都将状态保存到状态后端,并回复给 JobManager ACK 确认。
  2. 确认阶段(Commit Phase): 当 JobManager 收到所有算子的 ACK 确认后,会向所有算子发送一个提交事务的指令。算子收到指令后,会将数据写入到外部系统中,并且确认事务完成。如果在提交事务的过程中发生了异常,比如网络分区、超时等,JobManager 会向所有算子发送一个回滚事务的指令。算子收到指令后,会撤销之前写入的数据,并且回复 JobManager 确认回滚完成。

结合下文的 Checkpoint 的原理,就会发现 2PC 与 CheckPoint 是息息相关的,实际上 2PC 的预提交阶段就是 Checkpoint 的触发阶段,确认阶段就是 Checkpoint 的提交阶段。

状态管理

状态后端

Checkpoint

Checkpoint(检查点)Flink 中用于实现 Exactly-Once 语义的重要机制。它的基本原理是:当一个算子需要进行状态 checkpoint 时,它会 将当前的状态(包括算子的本地状态和外部系统的状态)保存到一个持久化存储(如 HDFS、S3 等)中。 同时,它会向 JobManager 发送一个确认消息,确认 checkpoint 已经完成。如果在 checkpoint 过程中发生了故障,JobManager 会触发一个回滚操作,将所有算子的状态恢复到 checkpoint 之前的状态。

Checkpoint实现原理: JobManager 向所有 Source(数据源)发送一个令牌(Barrier)。这个令牌会随着数据流向下游传递,当 Source 算子收到令牌后,会将当前的状态保存到状态后端。同时,它会向 JobManager 发送一个确认消息,确认 checkpoint 已经完成。当 JobManager 收到所有算子的确认消息后,即认为 checkpoint 完成。

双流Join

双流 join 也是在 Flink 作业的开发中常见的一种操作。毕竟我们平时写离线作业的时候就经常需要将作业进行 join 操作,将两个不同的数据集根据某个共同的字段进行关联。而在 Flink 中,双流 join 操作的实现原理与离线作业中的 join 操作类似,都是通过将两个数据集根据共同的字段进行关联,然后将关联结果输出到下游算子进行处理。这里将双流 Join 划分为了 3 中方式:基于时间的 join,基于窗口的 join,三流 join

基于时间的 join:两个流中相同的键值以及时间戳不超过某一范围的数据做 join,也是 Flink 作业中默认的 join 方式,例如下面的案例:

SELECT 
    o.order_id, 
    o.order_time, 
    s.ship_time, 
    o.price
FROM Orders o
JOIN Shipments s ON o.order_id = s.order_id
WHERE s.ship_time BETWEEN o.order_time AND o.order_time + INTERVAL '4' HOUR;

基于窗口的 join:将两个流的数据输入到公共窗口中进行 join

必须保证两条流的 window_start 和 window_end 完全一致,才能在窗口结束时触发 Join。

SELECT 
    L.ad_id, 
    L.window_start, 
    L.window_end, 
    L.click_count, 
    R.view_count
FROM (
    -- 这里的 ClickTable 和 ViewTable 已经通过 Window TVF 进行了开窗
    SELECT * FROM TABLE(TUMBLE(TABLE ClickTable, DESCRIPTOR(rowtime), INTERVAL '1' MINUTES))
) L
JOIN (
    SELECT * FROM TABLE(TUMBLE(TABLE ViewTable, DESCRIPTOR(rowtime), INTERVAL '1' MINUTES))
) R 
ON L.ad_id = R.ad_id AND L.window_start = R.window_start AND L.window_end = R.window_end;

三流 join:将双流 join 的结果保存到 Redis,再和第三个流进行 join

水位线

水位线(Watermark)是 Flink 中用于处理乱序事件时间(Event Time)的重要机制。它的基本原理是:当一个算子需要处理事件时间为 T 的数据时,它会先检查是否存在一个水位线为 T 的记录。如果存在,说明所有时间戳小于等于 T 的数据都已经到达,未来不会再有比 T 更早的数据了。因此,算子可以安全地处理时间戳为 T 的数据。

但是这里需要注意的是Flink 的一个DataStream 或者 Table 只能有一个活跃的水位线生成逻辑,

所以当需要使用同一个源计算不同的需求的时候, 需要将数据源转换为两个逻辑流 ,每个流里拥有自己独立的时间线。见下面的例子:

-- 原始 Kafka 表,不直接定义水印
CREATE TABLE raw_orders (
    order_id STRING,
    order_time TIMESTAMP(3),
    pay_time TIMESTAMP(3),
    ...
) WITH (...);
 
-- 视图 1:以下单时间为基准
CREATE VIEW view_order_time AS
SELECT *, WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
FROM raw_orders;
 
-- 视图 2:以支付时间为基准
CREATE VIEW view_pay_time AS
SELECT *, WATERMARK FOR pay_time AS pay_time - INTERVAL '5' SECOND
FROM raw_orders
WHERE pay_time IS NOT NULL; -- 排除未支付的数据,防止水印停滞
 
-- 统计下单量
INSERT INTO sink_table_a
SELECT window_start, window_end, COUNT(order_id)
FROM TABLE(TUMBLE(TABLE view_order_time, DESCRIPTOR(order_time), INTERVAL '1' HOURS))
GROUP BY window_start, window_end;
 
-- 统计支付量
INSERT INTO sink_table_b
SELECT window_start, window_end, COUNT(order_id)
FROM TABLE(TUMBLE(TABLE view_pay_time, DESCRIPTOR(pay_time), INTERVAL '1' HOURS))
GROUP BY window_start, window_end;

反压

详见--> Flink 背压

数开思想

Flink 数据流(Dataflow)模型最本质的特征就在于一个 “流” 字。不仅业务数据在流动,控制信号也在流动。Flink 将控制逻辑内化为特殊的数据包,注入到业务数据流中,使其随数据一同流转。这种设计使得分布式系统中的时间推进、状态一致性等复杂问题变得优雅而高效。以下是这一思想在 Flink 核心机制中的具体体现:

时间推进机制:Watermark(水位线):Watermark 是“流”思想在处理乱序事件时间(Event Time)时的经典应用,Watermark 本质上是一种携带时间戳的特殊数据记录,被源源不断地注入到数据流中,当一个算子接收到时间戳为 TTT 的 Watermark 时,它传达了一个明确的信号:时间戳小于等于 T 的所有数据都已经到达,未来不会再有比 T 更早的数据了。 它随着数据流向下游流动,触发窗口计算的执行。这种机制让系统无需依赖不确定的物理时间,而是通过流动的逻辑时钟来精准控制计算进度。

容错一致性:Checkpoint(检查点):在 Flink 的分布式快照(Checkpoint)机制中,“流”的思想同样得到了淋漓尽致的体现。Barrier(分界线)是一种轻量级的控制记录,由 JobManager 周期性地注入到 Source 端的并行数据流中。Barrier 将数据流切分为“前”和“后”两个部分。Barrier 之前的数据属于当前的 Checkpoint N,而 Barrier 之后的数据属于下一个 Checkpoint N+1。当算子从输入流中接收到 Barrier 时,它明白这是一个信号:现在需要为当前状态制作快照了 算子暂停处理新数据(在 Exactly-Once 语义下需进行 Barrier 对齐),将当前状态异步保存到持久化存储中。保存完毕后,算子向 JobManager 发送 ACK 确认,并将 Barrier 继续广播发送给下游的所有算子。

语法转换思想

在现代软件开发中,无论是大数据计算(如 SparkFlink),还是前端的内容呈现(如 Markdown、Vue/React 模板),都广泛存在着 “语言转换” 的机制。其核心逻辑在于:开发者使用高抽象级、易于表达的语言(DSL)描述“做什么(What)”,而系统负责将其转换为底层运行时能够理解的“怎么做(How)”。这种从高层抽象到底层执行的转换过程,通常遵循经典的编译管道设计,主要包含以下四个关键阶段:

词法与语法解析 (Lexical Analysis & Parsing):代码-->Token-->AST语法树

将代码字符串拆解为一个个最小的语义单元(Token),例如将 SELECT * 拆解为 SELECT 关键字和 * 符号。根据语法规则,将 Token 组装成抽象语法树(AST, Abstract Syntax Tree)。AST 是代码逻辑的树状结构表示,是后续所有步骤的基础。

语义分析与逻辑计划 (Semantic Analysis & Logical Planning):数据绑定,验证合法性

检查引用的表、字段、变量是否存在,类型是否匹配。将 AST 转换为更通用的逻辑操作树。此时只关心业务逻辑(如“过滤”、“连接”),不关心具体物理实现(如“是用 HashJoin 还是 SortMergeJoin”)。Catalog 介入,验证表名和字段,生成 Analyzed Logical Plan

优化执行 (Optimization):优化逻辑计划

  • 规则优化 (RBO):例如“谓词下推”(Predicate Pushdown),即尽早过滤数据,减少后续计算量。
  • 代价优化 (CBO):基于数据量估算,选择代价最小的算法(例如大表 Join 小表时自动选择 Broadcast Join)。
  • Catalyst 优化器通过一系列规则将逻辑计划转化为优化后的逻辑计划。

物理计划生成与执行 (Physical Planning & Execution):将计划翻译为目标平台的原生代码并执行。

参考

评论