4、实时数仓项目优化
约 14734 字大约 49 分钟
2026-01-17
实时项目优化
所有的优化,优先级最高的当然是资源的配置,资源配置调优永远放在第一位。
在Flink程序中,资源配置就是在Source,transform,sink并行度的设置,一个并行度需要一个slot,而一个slot内存的数量往往在TaskManager刚开始启动的时候,已经确定。当然也可以进行修改,比如在per-job模式中,任务来了才启动我们的taskManager,可以重新配置,在Session中不可以,因为任务来的时候,TaskManager已经启动好了。
消费资源配置调优
Flink 性能调优的第一步,就是为任务分配合适的资源,在一定范围内,增加资源的分配与性能的提升是成正比的,实现了最优的资源配置后,在此基础上再考虑进行后面论述的性能调优策略。
提交方式主要是 yarn-per-job,资源的分配在使用脚本提交 Flink 任务时进行指定。
标准的 Flink 任务提交脚本(Generic CLI 模式),从 1.11 开始,增加了通用客户端模式,参数使用-D <property=value>指定
如果有时候资源已经够了,但是人为添加超过需要的资源,这样不但不会提升性能,还有可能降低性能,比如map join中大表join小表的时候,小表默认使用的最大内存为25m,如果人为增大这个值,就可能降低性能。
- 每一个map任务都会下载一份大表的数据,有网络io,浪费性能。
- 申请资源非常的耗时,在使用yarn提交任务的时候,有一个资源的申请过程,如果分配资源太多,那么申请时间就很长。
bin/flink run \
-t yarn-per-job \
-d \
-p 5 \ 指定并行度
-Dyarn.application.queue=test \ 指定 yarn 队列
-Djobmanager.memory.process.size=1024mb \ 指定 JM 的总进程大小
-Dtaskmanager.memory.process.size=1024mb \ 指定每个 TM 的总进程大小
-Dtaskmanager.numberOfTaskSlots=2 \ 指定每个 TM 的 slot 数
-c com.atguigu.app.dwd.LogBaseApp \
/opt/module/gmall-flink/gmall-realtime-1.0-SNAPSHOT-jar-with-dependencies.jar如果是yarn-session或者standalone模式,需要在配置文件中配置资源。
内存设置
生产资源配置:
bin/flink run \
-t yarn-per-job \
-d \
-p 5 \ 指定并行度
-Dyarn.application.queue=test \ 指定 yarn 队列
-Djobmanager.memory.process.size=2048mb \ JM2~4G 足够
-Dtaskmanager.memory.process.size=6144mb \ 单个 TM2~8G 足够
-Dtaskmanager.numberOfTaskSlots=2 \ 与容器核数 1core:1slot 或 1core:2slot
-c com.atguigu.app.dwd.LogBaseApp \与容器核数 1core:1slot 或 1core:2slot
/opt/module/gmall-flink/gmall-realtime-1.0-SNAPSHOT-jar-with-dependencies.jarFlink 是实时流处理,关键在于资源情况能不能抗住高峰时期每秒的数据量,通常用QPS/TPS 来描述数据情况。
对于JobManager通常2-4G已经足够。
单个TaskManager,配置2-8G左右,对于具体而言,需要根据我们的业务去测,也就是数据量达到高峰期时候的配置内存量。
那每一个TaskManager里面配置多少个slot呢?
yarn中一个container中默认资源最大是8G,那么我们通常配置与容器的核数有关,通常是1core:1slot 或 1core:2slot关系,也就是与容器的核数1:1或者1:2关系。
启动一个TaskManager需要一个Container,而Container最大是8G,所以TaskManager配置内存不能超过容器的内存,通常配置2-8G.如果超过8g,那么需要扩大Container的内存。
container的核心数也可以配置或者指定,我们配置container的核心数之后,每一个TaskManager的slot数量可以参考container的核心数量。如果资源够的情况下,配置1:1,不够情况下配置1:2.配置1:2的话那么一个cpu上需要运行两个任务,运行会慢一点。
TaskManager和Slot的配置都和yarn 的container容量有关。
在生产中,我们一般不用内存及的状态后端,所以jobManager的内存使用量一般不会太大。
并行度设置
最优并行度计算
开发完成后,先进行压测。任务并行度给 10 以下,测试单个并行度的处理上限。然后总 QPS/单并行度的处理能力 = 并行度。
不能只从 QPS 去得出并行度,因为有些字段少、逻辑简单的任务,单并行度一秒处理几万条数据。而有些数据字段多,处理逻辑复杂,单并行度一秒只能处理 1000 条数据。
最好根据高峰期的 QPS 压测,并行度*1.2 倍,富余一些资源。比如高峰期测出来是50m/s,10个并行度,那么我就一直使用50m/s,10个并行度,跑几个小时做压测,都没有问题,那么说明没问题。*1.2防止估算有误差或者数据量增加。
如何做压测,积压kafak的数据,然后对Flink程序做压测。
Source 端并行度的配置
数据源端是 Kafka,Source 的并行度设置为 Kafka 对应 Topic 的分区数。这种情况最好。
如果已经等于 Kafka 的分区数,消费速度仍跟不上数据生产速度,考虑下 Kafka 要扩大分区,同时调大并行度等于分区数。
Flink 的一个并行度可以处理一至多个分区的数据,如果并行度多于 Kafka 的分区数,那么就会造成有的并行度空闲,浪费资源。
但是这种方式一定需要在最后尝试,我们调优首先做的就是增加资源。kafka分区数量增加是一个不可逆操作,增加了分区,就不能减少。
Transform 端并行度的配置
Keyby 之前的算子
一般不会做太重的操作,都是比如 map、filter、flatmap 等处理较快的算子,并行度可以和 source 保持一致。
Keyby 之后的算子
如果并发较大,建议设置并行度为 2 的整数次幂,例如:128、256、512;
小并发任务的并行度不一定需要设置成 2 的整数次幂;
大并发任务如果没有 KeyBy,并行度也无需设置为 2 的整数次幂;
如果业务很复杂,就提高并行度。
Sink 端并行度的配置
Sink 端是数据流向下游的地方,可以根据 Sink 端的数据量及下游的服务抗压能力进行评估。
如果 Sink 端是 Kafka,可以设为 Kafka 对应 Topic 的分区数。
Sink 端的数据量小,比较常见的就是监控告警的场景,并行度可以设置的小一些。
Source 端的数据量是最小的,拿到 Source 端流过来的数据后做了细粒度的拆分,数据量不断的增加,到 Sink 端的数据量就非常大。那么在 Sink 到下游的存储中间件的时候就需要提高并行度。
另外 Sink 端要与下游的服务进行交互,并行度还得根据下游的服务抗压能力来设置,
如果在 Flink Sink 这端的数据量过大的话,且 Sink 处并行度也设置的很大,但下游的服务完全撑不住这么大的并发写入,可能会造成下游服务直接被写挂,所以最终还是要在 Sink处的并行度做一定的权衡。
sink端和外部系统交互也有延迟,所以也可以使用批量提交的方式。所以对于sink,一个是降低并行度,一个是批量写入。
并行度,是在高峰期做压测得到的。
RocksDB 大状态调优
状态后端的优化,在生产环境中存储的是大状态,存储在文件系统中。是一个kv数据库,性能非常高。状态,本地存储在TaskManager内存,远程存储在文件系统。
如果使用RocksDB,开启检查点,状态就存储在远程,没有开启就存储在本地。

RocksDB 是基于 LSM Tree 实现的(类似 HBase),写数据都是先缓存到内存中,所以 RocksDB 的写请求效率比较高。RocksDB 使用内存结合磁盘的方式来存储数据,每次获取数据时,先从内存中 blockcache 中查找,如果内存中没有再去磁盘中查询。优化后差不多单并行度 TPS 5000 record/s,性能瓶颈主要在于 RocksDB 对磁盘的读请求,所以当处理性能不够时,仅需要横向扩展并行度即可提高整个 Job 的吞吐量。以下几个调优参数:
设置本地 RocksDB 多目录
在 flink-conf.yaml 中配置:
state.backend.rocksdb.localdir:
/data1/flink/rocksdb,/data2/flink/rocksdb,/data3/flink/rocksdb注意:不要配置单块磁盘的多个目录,务必将目录配置到多块不同的磁盘上,让多块磁盘来分担压力。当设置多个 RocksDB 本地磁盘目录时,Flink 会随机选择要使用的目录,所以就可能存在三个并行度共用同一目录的情况。如果服务器磁盘数较多,一般不会出现该情况,但是如果任务重启后吞吐量较低,可以检查是否发生了多个并行度共用同一块磁盘的情况。
当一个 TaskManager 包含 3 个 slot 时,那么单个服务器上的三个并行度都对磁盘造成频繁读写,从而导致三个并行度的之间相互争抢同一个磁盘 io,这样务必导致三个并行度的吞吐量都会下降。设置多目录实现三个并行度使用不同的硬盘从而减少资源竞争。
如下所示是测试过程中磁盘的 IO 使用率,可以看出三个大状态算子的并行度分别对应了三块磁盘,这三块磁盘的 IO 平均使用率都保持在 45% 左右,IO 最高使用率几乎都是 100%,而其他磁盘的 IO 平均使用率相对低很多。由此可见使用 RocksDB 做为状态后端且有大状态的频繁读取时, 对磁盘 IO 性能消耗确实比较大。

如下图所示,其中两个并行度共用了 sdb 磁盘,一个并行度使用 sdj 磁盘。可以看到sdb 磁盘的 IO 使用率已经达到了 91.6%,就会导致 sdb 磁盘对应的两个并行度吞吐量大大降低,从而使得整个 Flink 任务吞吐量降低。如果每个服务器上有一两块 SSD,强烈建议将 RocksDB 的本地磁盘目录配置到 SSD 的目录下,从 HDD 改为 SSD 对于性能的提升可能比配置 10 个优化参数更有效。

state.backend.incremental:开启增量检查点,默认 false,改为 true。增量检查点只有Rocks DB有,其他两个都没有。
state.backend.rocksdb.predefined-options:SPINNING_DISK_OPTIMIZED_HIGH_MEM 设置为机械硬盘+内存模式,有条件上SSD,指定为 FLASH_SSD_OPTIMIZED
读缓存
state.backend.rocksdb.block.cache-size: 整 个 RocksDB 共 享 一 个 block cache,读数据时内存的 cache 大小,该参数越大读数据时缓存命中率越高,默认大小为 8 MB,建议设置到 64 ~ 256 MB。
写缓存
state.backend.rocksdb.writebuffer.size: RocksDB 中,每个 State 使用一个Column Family,每个 Column Family 使用独占的 write buffer,建议调大,例如:32M
state.backend.rocksdb.thread.num: 用于后台 flush 和合并 sst 文件的线程数,默认为 1,建议调大,机械硬盘用户可以改为 4 等更大的值。
state.backend.rocksdb.writebuffer.count: 每 个 Column Family 对 应 的writebuffer 数目,默认值是 2,对于机械磁盘来说,如果内存⾜够大,可以调大到 5左右。
state.backend.rocksdb.writebuffer.number-to-merge: 将数据从 writebuffer中 flush 到磁盘时,需要合并的 writebuffer 数量,默认值为 1,可以调成 3。
state.backend.local-recovery: 设置本地恢复,当 Flink 任务失败时,可以基于本地的状态信息进行恢复任务,可能不需要从 hdfs 拉取数据。
CheckPoint设置
一般我们的 Checkpoint 时间间隔可以设置为分钟级别,例如 1 分钟、3 分钟,对于状态很大的任务每次 Checkpoint 访问 HDFS 比较耗时,可以设置为 5~10 分钟一次Checkpoint,并且调大两次 Checkpoint 之间的暂停间隔,例如设置两次 Checkpoint 之间至少暂停 4 或 8 分钟。
checkpoint是防止任务挂掉能从失败中恢复,一般情况下任务不可能很快挂掉,所以建议调大时间。
如果 Checkpoint 语义配置为 EXACTLY_ONCE,那么在 Checkpoint 过程中还会存在 barrier 对齐的过程,可以通过 Flink Web UI 的 Checkpoint 选项卡来查看Checkpoint 过程中各阶段的耗时情况,从而确定到底是哪个阶段导致 Checkpoint 时间过长然后针对性的解决问题。
RocksDB 相关参数在 1.3 中已说明,可以在 flink-conf.yaml 指定,也可以在 Job 的代码中调用 API 单独指定,这里不再列出。
// 使⽤ RocksDBStateBackend 做为状态后端,并开启增量 Checkpoint
RocksDBStateBackend rocksDBStateBackend = new
RocksDBStateBackend("hdfs://hadoop102:8020/flink/checkpoints", true);
env.setStateBackend(rocksDBStateBackend);
// 开启 Checkpoint,间隔为 3 分钟
env.enableCheckpointing(TimeUnit.MINUTES.toMillis(3));
// 配置 Checkpoint
CheckpointConfig checkpointConf = env.getCheckpointConfig();
checkpointConf.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// 最小间隔 4 分钟
checkpointConf.setMinPauseBetweenCheckpoints(TimeUnit.MINUTES.toMillis(4))
// 超时时间 10 分钟
checkpointConf.setCheckpointTimeout(TimeUnit.MINUTES.toMillis(10));
// 保存 checkpoint
//这个参数是说在任务最后如果我们主动cancel任务后,会删除最后一次的checkpoint,设置为retain就会保留,如果任务是失败自己退出,那不会自动删除,会保留
checkpointConf.enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);使用 Flink ParameterTool 读取配置
在实际开发中,有各种环境(开发、测试、预发、生产),作业也有很多的配置:算子的并行度配置、Kafka 数据源的配置(broker 地址、topic 名、group.id)、Checkpoint是否开启、状态后端存储路径、数据库地址、用户名和密码等各种各样的配置,可能每个环境的这些配置对应的值都是不一样的。
如果你是直接在代码⾥⾯写死的配置,每次换个环境去运行测试作业,都要重新去修改代码中的配置,然后编译打包,提交运行,这样就要花费很多时间在这些重复的劳动力上了。在 Flink 中可以通过使用 ParameterTool 类读取配置,它可以读取环境变量、运行参数、配置文件。ParameterTool 是可序列化的,所以你可以将它当作参数进行传递给算子的自定义函数类。

压测方式
压测的方式很简单,先在 kafka 中积压数据,之后开启 Flink 任务,出现反压,就是处理瓶颈。相当于水库先积水,一下子泄洪。数据可以是自己造的模拟数据,也可以是生产中的部分数据。
反压处理
反压产生的机制
在Flink中,每一个Task都有一个读写缓存,还每有处理的数据存放在读缓存,处理完的数据放在写缓存,如果中间某一个task处理数据慢,那么就会导致其都缓存中存在数据的挤压,所以产生了一个连锁反应,导致上游所有的task读写缓存都存在数据的挤压,那么数据源kafka采用的是拉去数据的方式,所以source不会去kafka中读数据,所以就产生了数据的延迟。
如果source端采用的是推模式,那么就会产生内存的溢出,kafka只会产生数据延迟高,因为是拉模式。
spark streaming反压机制

Spark streamming会有一个单独的Executor来接收数据,他会和Driver之间进行交互,是否接收数据,如果Driver下游的多个任务中,有一个产生了反压现象,那么他会主动给Driver报告产生反压,然后Driver会通知Executor接收数据的线程接收慢一点。
这两种模式哪一种好点?
spark的反压是否是一个全局的模式,就是下游某一个任务产生反压,那么他会向Driver报告,Driver会通知Executor发送给下游所有任务数据量少一点,那么整个集群的吞吐量就会降低。
所以是一个任务会影响整个任务吞吐量,表现在所有任务的数据量少了,但是此时可能其他任务还没有受什么影响,浪费资源。而Flink不会这样,Flink是一个逐级反馈过程。
Flink反压的好处,是一个链式反应,如果某一个任务产生反压,那么上游的算子可能会帮助产生反压的算子分担一点数据量,这样一级一级的反应,可能到source端就消失了,还可以正常消费数据。
反压处理
反压(BackPressure)通常产生于这样的场景:短时间的负载高峰导致系统接收数据的速率远高于它处理数据的速率。许多日常问题都会导致反压,例如,垃圾回收停顿可能会导致流入的数据快速堆积,或遇到大促、秒杀活动导致流量陡增。反压如果不能得到正确的处理,可能会导致资源耗尽甚至系统崩溃。
反压机制是指系统能够自己检测到被阻塞的 Operator,然后自适应地降低源头或上游数据的发送速率,从而维持整个系统的稳定。Flink 任务一般运行在多个节点上,数据从上游算子发送到下游算子需要网络传输,若系统在反压时想要降低数据源头或上游算子数据的发送速率,那么肯定也需要网络传输。所以下面先来了解一下 Flink 的网络流控(Flink 对网络数据流量的控制)机制。
反压现象及定位
Flink 的反压太过于天然了,导致无法简单地通过监控 BufferPool 的使用情况来判断反压状态。Flink 通过对运行中的任务进行采样来确定其反压,如果一个 Task 因为反压导致处理速度降低了,那么它肯定会卡在向 LocalBufferPool 申请内存块上。那么该 Task 的stack trace 应该是这样:
java.lang.Object.wait(Native Method)
o.a.f.[...].LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
o.a.f.[...].LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133) [...]监控对正常的任务运行有一定影响,因此只有当 Web 页面切换到 Job 的BackPressure 页面时,JobManager 才会对该 Job 触发反压监控。默认情况下,JobManager 会触发 100 次 stack trace 采样,每次间隔 50ms 来确定反压。Web 界面 看 到 的 比 率 表 示 在 内 部 方 法 调 用 中 有 多 少 stack trace 被 卡 在LocalBufferPool.requestBufferBlocking(),例如: 0.01 表示在 100 个采样中只有 1 个被卡在 LocalBufferPool.requestBufferBlocking()。采样得到的比例与反压状态的对应关系如下:
- OK: 0 <= 比例 <= 0.10
- LOW: 0.10 < 比例 <= 0.5
- HIGH: 0.5 < 比例 <= 1
Task 的状态为 OK 表示没有反压,HIGH 表示这个 Task 被反压。其实就是表示内存使用量的关系,内存使用量超过0.5,说明内存不够,快产生反压。
利用 Flink Web UI 定位产 生
在 Flink Web UI 中有 BackPressure 的页面,通过该页面可以查看任务中 subtask的反压状态,如下两图所示,分别展示了状态是 OK 和 HIGH 的场景。
排查的时候,先把 operator chain 禁用,方便定位。因为一个任务连中包含若干个算子,我们需要具体找到哪一个算子产生反压。

从下面可以看到Ratio比率很接近1,也就是内存快满了,所以产生反压。

通常会对每一个算子的一个并行度进行反压检测,也就是说有几个并行度,就检测几个反压。如果有某一个并行度产生反压,那往往是数据倾斜导致。
可以看到,针对每一个任务都会有两块内存,读和写缓存:

web ui适合临时的查看任务情况,不适合监控,需要配合监控工具使用。
利用 Metrics 定位反压位置
利用监控项监控。
当某个 Task 吞吐量下降时,基于 Credit 的反压机制,上游不会给该 Task 发送数据,所以该 Task 不会频繁卡在向 Buffer Pool 去申请 Buffer。反压监控实现原理就是监控Task 是否卡在申请 buffer 这一步,所以遇到瓶颈的 Task 对应的反压⻚⾯必然会显示OK,即表示没有受到反压。
如果该 Task 吞吐量下降,造成该 Task 上游的 Task 出现反压时,必然会存在:该Task 对应的 InputChannel 变满,已经申请不到可用的 Buffer 空间。如果该 Task 的InputChannel 还能申请到可用 Buffer,那么上游就可以给该 Task 发送数据,上游 Task也就不会被反压了,所以说遇到瓶颈且导致上游 Task 受到反压的 Task 对应的InputChannel 必然是满的(这⾥不考虑⽹络遇到瓶颈的情况)。从这个思路出发,可以对该 Task 的 InputChannel 的使用情况进行监控,如果 InputChannel 使用率 100%,那么 该 Task 就是 我们要 找的 反压 源。 Flink 1.9 及以 上版 本 inPoolUsage 表 示inputFloatingBuffersUsage 和 inputExclusiveBuffersUsage 的总和。
反压时,可以看到遇到瓶颈的该 Task 的 inPoolUage 为 1。
反压的原因及处理
先检查基本原因,然后再深入研究更复杂的原因,最后找出导致瓶颈的原因。下面列出从最基本到比较复杂的一些反压潜在原因。
注意:反压可能是暂时的,可能是由于负载高峰、CheckPoint 或作业重启引起的数据积压而导致反压。如果反压是暂时的,应该忽略它。另外,请记住,断断续续的反压会影响我们分析和解决问题。
系统资源
检查涉及服务器基本资源的使用情况,如 CPU、网络或磁盘 I/O,目前 Flink 任务使用最主要的还是内存和 CPU 资源,本地磁盘、依赖的外部存储资源以及网卡资源一般都不会是瓶颈。如果某些资源被充分利用或大量使用,可以借助分析工具,分析性能瓶颈(JVMProfiler+ FlameGraph 生成火焰图)。
在启动任务之前,一定要去进行压力测试,测试出任务数据的最高峰,然后根据最高峰的数据率去分配资源,否则的话,某一时刻数据率非常高,资源不够就会产生压测。
- 针对特定的资源调优 Flink
- 通过增加并行度或增加集群中的服务器数量来横向扩展
- 减少瓶颈算子上游的并行度,从而减少瓶颈算子接收的数据量(不建议,可能造成整个Job 数据延迟增大)
垃圾收集(GC )
长 时 间 GC 暂 停 会 导 致 性 能 问 题 。 可 以 通 过 打 印 调 试 GC 日 志 ( 通 过-XX:+PrintGCDetails)或使用某些内存或 GC 分析器(GCViewer 工具)来验证是否处于这种情况。
GC过程中,整个任务就是静止的。
- 在 Flink 提交脚本中,设置 JVM 参数,打印 GC 日志:
bin/flink run \
-t yarn-per-job \
-d \
-p 5 \ 指定并行度
-Dyarn.application.queue=test \ 指定 yarn 队列
-Djobmanager.memory.process.size=1024mb \ 指定 JM 的总进程大小
-Dtaskmanager.memory.process.size=1024mb \ 指定每个 TM 的总进程大小
-Dtaskmanager.numberOfTaskSlots=2 \ 指定每个 TM 的 slot 数
-Denv.java.opts="-XX:+PrintGCDetails -XX:+PrintGCDateStamps"
-c com.atguigu.app.dwd.LogBaseApp \
/opt/module/gmall-flink/gmall-realtime-1.0-SNAPSHOT-jar-with-dependencies.jar因为是 on yarn 模式,运行的节点一个一个找比较麻烦。可以打开 WebUI,选择JobManager 或者 TaskManager,点击 Stdout,即可看到 GC 日志,点击下载按钮即可将 GC 日志通过 HTTP 的方式下载下来
分析 GC 日志:
通过 GC 日志分析出单个 Flink Taskmanager 堆总大小、年轻代、老年代分配的内存空间、Full GC 后老年代剩余大小等,相关指标定义可以去 Github 具体查看。
扩展:最重要的指标是 Full GC 后,老年代剩余大小这个指标,按照《Java 性能优化权威指南》这本书 Java 堆大小计算法则,设 Full GC 后老年代剩余大小空间为 M,那么堆的大小建议 3 ~ 4 倍 M,新生代为 1 ~ 1.5 倍 M,老年代应为 2 ~ 3 倍 M。
CPU/ 线程瓶颈
有时,一个或几个线程导致 CPU 瓶颈,而整个机器的 CPU 使用率仍然相对较低,则可能无法看到 CPU 瓶颈。例如,48 核的服务器上,单个 CPU 瓶颈的线程仅占用 2%的CPU 使用率,就算单个线程发生了 CPU 瓶颈,我们也看不出来。可以考虑使用 火焰图分析工具,它们可以显示每个线程的 CPU 使用情况来识别热线程。
线程竞争
与上⾯的 CPU/线程瓶颈问题类似,subtask 可能会因为共享资源上高负载线程的竞争而成为瓶颈。同样,可以考虑使用火焰图分析工具,考虑在用户代码中查找同步开销、锁竞争,尽管避免在用户代码中添加同步。
负载不平衡
如果瓶颈是由数据倾斜引起的,可以尝试通过将数据分区的 key 进行加盐或通过实现本地预聚合来减轻数据倾斜的影响。(关于数据倾斜的详细解决方案,会在下一章节详细讨论)
外部依赖
如果发现我们的 Source 端数据读取性能比较低或者 Sink 端写入性能较差,需要检查第三方组件是否遇到瓶颈。例如,Kafka 集群是否需要扩容,Kafka 连接器是否并行度较低,HBase 的 rowkey 是否遇到热点问题。关于第三方组件的性能问题,需要结合具体的组件来分析。
使用旁路缓存+异步IO
资源:
- 系统资源
- 垃圾收集
- cpu/线程瓶颈
- 线程竞争
针对资源问题,我们可以通过各种工具去查看各种资源的使用情况,最好的方式就是在启动任务之前,做压测,在分配资源。
负载均衡数据倾斜情况
外部系统瓶颈:旁路缓存+异步IO
针对反压处理,一共三种处理方式
- 资源
- 负载均衡数据倾斜
- 外部系统
说反压,一定要明白是什么原因导致,然后在去解决。检测反压通过web ui或者监控的普罗米修斯检查是单个任务产生反压还是多个任务产生反压,检测出来的反压具体到某一个算子反压,如果是key by之后算子反压,那么可能产生数据倾斜。
数据倾斜
数据倾斜也会引起反压,比如一个任务中有5个子任务,那么可能有某一个任务数据量太大产生反压,这种情况往往是数据倾斜导致的问题。有几个分区,就有几个并行度。
判断是否存在数据倾斜
相同 Task 的多个 Subtask 中,个别 Subtask 接收到的数据量明显大于其他Subtask 接收到的数据量,通过 Flink Web UI 可以精确地看到每个 Subtask 处理了多少数据,即可判断出 Flink 任务是否存在数据倾斜。通常,数据倾斜也会引起反压。
但是也可能产生数据倾斜,但是并没有发生反压,因为资源给的足够多。

在普罗米修斯做监控的时候,可以细化到监控每一个Task的子任务,有几个并行度,就有几个子任务,我们可以通过子任务数据量,判断是否发生数据倾斜。
数据倾斜的解决
keyBy 之前 发生 数据倾斜
如果 keyBy 之前就存在数据倾斜,上游算子的某些实例可能处理的数据较多,某些实例可能处理的数据较少,产生该情况可能是因为数据源的数据本身就不均匀,例如由于某些原因 Kafka 的 topic 中某些 partition 的数据量较大,某些 partition 的数据量较少。对于不存在 keyBy 的 Flink 任务也会出现该情况。
这种情况,需要让 Flink 任务强制进行 shuffle(随机分配数据)。使用 shuffle、rebalance(轮询分配数据) 或 rescale(范围轮询)算子即可将数据均匀分配,从而解决数据倾斜的问题。
也可以在上游的Kafka分区哪里进行解决,轮询分发数据。
keyBy 后的聚合操作存在数据倾斜
keyBy()是按照hash进行分组,和mapreduce中的shuffle,spark中的shuffle,hive中的keyBy()很像。
这是mr中的思想:
简单处理就是双重聚合,首先用随机数将数据打散,比如数据a有10万条数据,那么给数据a添加10个随机数,将数据a分10组,每一组数据量打散后就是10000,那么通过第一次,每一组的结果就是a-0,a-1局部聚合的结果,第二次reduce聚合就把后面的0,1,2随机数去掉,那么现在a有十条数据,针对这10条数据在做一次聚合即可。
如果在Flink中直接使用上面方法不做开窗是是用不了的。也就是直接聚合不做开窗无法使用。但是mr和spark可以使用。
Flink为什么不能使用双重聚合

假如上游有4个数据a,那么经过第二个算子后,会随机添加随机数,比如a1,a2,然后使用keyBy()操作后,a1被分到一个分区,a2被分到另一个分区,然后做变形(a1,1),(a2,1),那么数据发送到下游后,会流式聚合,然后下游数据聚合后就变为(a1,2),(a2,2),那么接下来会去掉伪随机数,变为(a,2),(a,2),(a,1),(a,1),下游进行keyBy()后,a的数据又重新发送到一个分区,发现数据量并没有减少,并没有做到数据量减少的目的,下游又发生数据倾斜。
另一个就是发现数据重复导致结果不对,看去掉伪随机数的数据,(a,1)发生重复,导致多次累加,所以结果不正确,这都是因为Flink是数据流的目的,因为数据在当前算子处理完毕后,会接着向下传递。
总的来说有两个问题:
- 并没有达到减少数据量的目的
- 重复计算,导致结果错误。

原始并行度中数据有4条,那么发送到下游带个并行度中,处理的数据不能少,也有4条。
第二个方框是添加伪随机数,然后根据伪随机数进行分区keyBy()操作,(a1,1)(a2,1)发送到上面的分区中,去掉伪随机数进行累加,另外两个(a1,1)(a2,1)发送到下面的分区中,累加,最终去掉多有的伪随机数,全部发送到下游单并行度,数据量没有减少,还会发生数据倾斜。但是有重复计算,重复数据。
注意,在两个分区中输出的结果是:(a,1),(a,2),所以导致结果错误。
问题就出现在Flink是流式处理,来一条数据就会处理并且输出一条数据,在中间分区哪里,每一个分区分别输出两次数据,导致结果不正确。所以如果在分区哪里,只输出最后结果,只输出一次结果就可以了。那么可以使用下面的思想:
使用 LocalKeyBy 的思想:在 keyBy 上游算子数据发送之前,首先在上游算子(比如map)的本地对数据进行聚合后再发送到下游(先在上游做一次聚合操作),使下游接收到的数据量大大减少,从而使得 keyBy 之后的聚合操作不再是任务的瓶颈。类似 MapReduce 中 Combiner 的思想,但是这要求聚合操作必须是多条数据或者一批数据才能聚合,单条数据没有办法通过聚合来减少数据量。
从 Flink LocalKeyBy 实现原理来讲,必然会存在一个积攒批次的过程,在上游算子中必须攒够一定的数据量,对这些数据聚合后再发送到下游。
注意:Flink 是实时流处理,如果 keyby 之后的聚合操作存在数据倾斜,且没有开窗口的情况下,简单的认为使用两阶段聚合,是不能解决问题的。因为这个时候 Flink 是来一条处理一条,且向下游发送一条结果,对于原来 keyby 的维度(第二阶段聚合)来讲,数据量并没有减少,且结果重复计算(非 FlinkSQL,未使用回撤流),如下图所示:

这张图表示的就是上面说存在的问题。
实现方式:以计算 PV 为例,keyby 之前,使用 flatMap 实现 LocalKeyby
使用LocalKeyBy 思想,在本地搞一个状态map, 然后来一条数据存储一条数据,然后写一个定时器,或者当map数据达到多少的时候,触发一次计算,把当前数据做一次累加,输出,这样就保证输出了一次,相当于在map端做了一次预聚合操作。也就是mr中的combiner()操作。流式数据无法在map端聚合所有的数据,因为数据源源不断的。
class LocalKeyByFlatMap extends RichFlatMapFunction<String, Tuple2<String,
//Checkpoint 时为了保证 Exactly Once,将 buffer 中的数据保存到该 ListState 中
private ListState<Tuple2<String, Long>> localPvStatListState;
//本地 buffer,存放 local 端缓存的 app 的 pv 信息
private HashMap<String, Long> localPvStat;
//缓存的数据量大小,即:缓存多少数据再向下游发送
private int batchSize;
//计数器,获取当前批次接收的数据量
private AtomicInteger currentSize;
//构造器,批次大小传参
LocalKeyByFlatMap(int batchSize){
this.batchSize = batchSize;
}
@Override
public void flatMap(String in, Collector collector) throws Exception {
// 将新来的数据添加到 buffer 中
Long pv = localPvStat.getOrDefault(in, 0L);
localPvStat.put(in, pv + 1);
// 如果到达设定的批次,则将 buffer 中的数据发送到下游
if(currentSize.incrementAndGet() >= batchSize){
// 遍历 Buffer 中数据,发送到下游
for(Map.Entry<String, Long> appIdPv: localPvStat.entrySet()) {
collector.collect(Tuple2.of(appIdPv.getKey(), appIdPv.getValue()
}
// Buffer 清空,计数器清零
localPvStat.clear();
currentSize.set(0);
}
}
@Override
public void snapshotState(FunctionSnapshotContext functionSnapshotConte
// 将 buffer 中的数据保存到状态中,来保证 Exactly Once
localPvStatListState.clear();
for(Map.Entry<String, Long> appIdPv: localPvStat.entrySet()) {
localPvStatListState.add(Tuple2.of(appIdPv.getKey(), appIdPv.ge
}
}
@Override
public void initializeState(FunctionInitializationContext context) {
// 从状态中恢复 buffer 中的数据
localPvStatListState = context.getOperatorStateStore().getListState
new ListStateDescriptor<>("localPvStat",
TypeInformation.of(new TypeHint<Tuple2<String, Long>>})));
localPvStat = new HashMap();
if(context.isRestored()) {
// 从状态中恢复数据到 localPvStat 中
for(Tuple2<String, Long> appIdPv: localPvStatListState.get()){
long pv = localPvStat.getOrDefault(appIdPv.f0, 0L);
// 如果出现 pv != 0,说明改变了并行度,
// ListState 中的数据会被均匀分发到新的 subtask 中
// 所以单个 subtask 恢复的状态中可能包含两个相同的 app 的数据
localPvStat.put(appIdPv.f0, pv + appIdPv.f1);
}
// 从状态恢复时,默认认为 buffer 中数据量达到了 batchSize,需要向下游发
currentSize = new AtomicInteger(batchSize);
} else {
currentSize = new AtomicInteger(0);
}
}
}keyBy后的窗口
因为使用了窗口,变成了有界数据的处理,窗口默认是触发时才会输出一条结果发往下游,所以可以使用两阶段聚合的方式:
实现思路:
- 第一阶段聚合:key 拼接随机数前缀或后缀,进行 keyby、开窗、聚合
- 注意:聚合完不再是 WindowedStream,要获取 WindowEnd 作为窗口标记作为第二阶段分组依据,避免不同窗口的结果聚合到一起)
- 第二阶段聚合:去掉随机数前缀或后缀,按照原来的 key 及 windowEnd 作 keyby、聚合,因为第一阶段聚合之后,输出的数据量就会少很多。
一个窗口就相当于一次批处理,只会输出一次结果。keyBy 后的聚合操作存在数据倾斜这种方法,在map端设置定时器或者数据的条数,就相当于开了一个伪窗口。
窗口不存在数据量大和结果错误问题:
- 因为窗口式批处理,最终只会输出一条聚合的结果,而不像上面的那样,没来一条数据都会输出一个结果,结果式累加输出的,所以数据量本质没有减少。
- 不存在结果错误,因为窗口只会输出一个结果,不存在重复数据。

可以看到,数据打散之后,每一个窗口单独聚合输出一个结果。不会出现问题。
但是也会产生一种现象:

keyBy()开窗口聚合之后,做最终聚合的时候,会输出(a,4),(a,6),(a,8),但是我们期望得到的结果式(a,4)(a,4),也就式将多个窗口的结果放在一起做聚合操作。
解决方法:首先要获取 WindowEnd 或者windowstart作为窗口标记作为第二阶段分组依据,避免不同窗口的结果聚合到一起)
然后第二阶段聚合的时候,去掉随机数前缀或后缀,按照原来的 key 及 windowEnd 作 keyby、聚合,因为第一阶段聚合之后,输出的数据量就会少很多。这样做就把窗口信息也作为分组的一部分,比如有是个a,那0-5个a做聚合,5-10个a做聚合,最后做一次总的聚合。防止出现上图中出现的,局部做聚合。
KafkaSource调优
动态发现分区
当 FlinkKafkaConsumer 初始化时,每个 subtask 会订阅一批 partition,但是当Flink 任务运行过程中,如果被订阅的 topic 创建了新的 partition,FlinkKafkaConsumer如何实现动态发现新创建的 partition 并消费呢?
在使用 FlinkKafkaConsumer 时,可以开启 partition 的动态发现。通过 Properties指定参数开启(单位是毫秒):
FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS该参数表示间隔多久检测一次是否有新创建的 partition。默认值是 Long 的最小值,表示不开启,大于 0 表示开启。开启时会启动一个线程根据传入的 interval 定期获取 Kafka最新的元数据,新 partition 对应的那一个 subtask 会自动发现并从 earliest 位置开始消费,新创建的 partition 对其他 subtask 并不会产生影响。
properties.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTE
RVAL_MILLIS, 30 * 1000 + "");从 Kafka 数据源成生成 watermark
Kafka 单分区内有序,多分区间无序。在这种情况下,可以使用 Flink 中可识别 Kafka分区的 watermark 生成机制。使用此特性,将在 Kafka 消费端内部针对每个 Kafka 分区生成 watermark,并且不同分区 watermark 的合并方式与在数据流 shuffle 时的合并方式相同。
在单分区内有序的情况下,使用时间戳单调递增按分区生成的 watermark 将生成完美的全局 watermark。可以不使用 TimestampAssigner ,直接用 Kafka 记录自身的时间戳:
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers",
"hadoop1:9092,hadoop2:9092,hadoop3:9092");
properties.setProperty("group.id", "fffffffffff");
FlinkKafkaConsumer<String> kafkaSourceFunction = new FlinkKafkaConsumer<>(
"flinktest",
new SimpleStringSchema(),
properties
);
kafkaSourceFunction.assignTimestampsAndWatermarks(
WatermarkStrategy
.forBoundedOutOfOrderness(Duration.ofMinutes(2))
);
env.addSource(kafkaSourceFunction)设置空闲等待
如果数据源中的某一个分区/分片在一段时间内未发送事件数据,则意味着WatermarkGenerator 也不会获得任何新数据去生成 watermark。我们称这类数据源为空闲输入或空闲源。在这种情况下,当某些其他分区仍然发送事件数据的时候就会出现问题。
比如 Kafka 的 Topic 中,由于某些原因,造成个别 Partition 一直没有新的数据。由于下游算子 watermark 的计算方式是取所有不同的上游并行数据源 watermark的最小值,则其 watermark 将不会发生变化,导致窗口、定时器等不会被触发。为了解决这个问题,你可以使用 WatermarkStrategy 来检测空闲输入并将其标记为空闲状态。

可以看到,有四个分区,但是有一个分区没有数据,那么他传输到下游的watermark只永远是long类型最小值,那么下游每一个分区的watermark值每一次都设置为最小值,所以永远不会触发计算。
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers",
"hadoop1:9092,hadoop2:9092,hadoop3:9092");
properties.setProperty("group.id", "fffffffffff");
FlinkKafkaConsumer<String> kafkaSourceFunction = new FlinkKafkaConsumer<>(
"flinktest",new SimpleStringSchema(),
properties
);
kafkaSourceFunction.assignTimestampsAndWatermarks(
WatermarkStrategy
.forBoundedOutOfOrderness(Duration.ofMinutes(2))
.withIdleness(Duration.ofMinutes(5))//过期时间,分区中多长时间没有数据,那么下次就不考虑这个分区
);
env.addSource(kafkaSourceFunction)Kafka的 offset 消费策略
FlinkKafkaConsumer 可以调用以下 API,注意与”auto.offset.reset”(有新的消费者组加入生效,之前的offset失效或者不存在)区分开:
- setStartFromGroupOffsets():默认消费策略,默认读取上次保存的 offset 信息,如果 是 应 用 第 一次 启 动, 读 取 不 到 上次 的 offset 信息 , 则 会根 据这 个 参 数auto.offset.reset 的值来进行消费数据。建议使用这个。
- setStartFromEarliest():从最早的数据开始进行消费,忽略存储的 offset 信
- setStartFromLatest():从最新的数据进行消费,忽略存储的 offset 信息
- setStartFromSpecificOffsets(Map):从指定位置进行消费
- setStartFromTimestamp(long):从 topic 中指定的时间点开始消费,指定时间点之 前的数据忽略
- 当 checkpoint 机制开启的时候,KafkaConsumer 会定期把 kafka 的 offset 信息还有其他 operator 的状态信息一块保存起来。当 job 失败重启的时候,Flink 会从最近一次的 checkpoint 中进行恢复数据,重新从保存的 offset 消费 kafka 中的数据(也就是说,上面几种策略,只有第一次启动的时候起作用)。
- 为了能够使用支持容错的 kafka Consumer,需要开启 checkpoint,如果不开启检查点机制,那么什么一致性语义都不会生效。
FlinkSql调优
从flink1.12sql才完整,之前都不是很完整。
使用的式Flink cdc11。
Group Aggregate 优化
开启 MiniBatch (提升吞吐)
MiniBatch 是微批处理,原理是缓存一定的数据后再触发处理,以减少对 State 的访问,从而提升吞吐并减少数据的输出量。MiniBatch 主要依靠在每个 Task 上注册的 Timer 线程来触发微批,需要消耗一定的线程调度性能。
MiniBatch 默认关闭,开启方式如下:
// 初始化 table environment
TableEnvironment tEnv = ...
// 获取 tableEnv 的配置对象
Configuration configuration = tEnv.getConfig().getConfiguration();
// 设置参数:
// 开启 miniBatch
configuration.setString("table.exec.mini-batch.enabled", "true");
// 批量输出的间隔时间
configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
// 防止 OOM 设置每个批次最多缓存数据的条数,可以设为 2 万条
configuration.setString("table.exec.mini-batch.size", "20000");适用场景
微批处理通过增加延迟换取高吞吐,如果有超低延迟的要求,不建议开启微批处理。通常对于聚合的场景,微批处理可以显著的提升系统性能,建议开启。
注意事项
- 目前,key-value 配置项仅被 Blink planner 支持。
- 1.12 之前的版本有 bug,开启 miniBatch,不会清理过期状态,也就是说如果设置状态的 TTL,无法清理过期状态。1.12 版本才修复这个问题。
开启 LocalGlobal (解决常见数据热点问题,数据倾斜)
LocalGlobal 优化将原先的 Aggregate 分成 Local+Global 两阶段聚合,即MapReduce 模型中的 Combine+Reduce 处理模式。第一阶段在上游节点本地攒一批数据进行聚合(localAgg),并输出这次微批的增量值(Accumulator)。第二阶段再将收到的 Accumulator 合并(Merge),得到最终的结果(GlobalAgg)。
LocalGlobal 本质上能够靠 LocalAgg 的聚合筛除部分倾斜数据,从而降低 GlobalAgg的热点,提升性能。结合下图理解 LocalGlobal 如何解决数据倾斜的问题。

由上图可知:
- 未开启 LocalGlobal 优化,由于流中的数据倾斜,Key 为红色的聚合算子实例需要处理更多的记录,这就导致了热点问题。
- 开启 LocalGlobal 优化后,先进行本地聚合,再进行全局聚合。可大大减少 GlobalAgg的热点,提高性能。
LocalGlobal 开启方式:
- LocalGlobal 优化需要先开启 MiniBatch,依赖于 MiniBatch 的参数。
- table.optimizer.agg-phase-strategy: 聚合策略。默认 AUTO,支持参数 AUTO、TWO_PHASE(使用 LocalGlobal 两阶段聚合)、ONE_PHASE(仅使用 Global 一阶段聚合)。
// 初始化 table environment
TableEnvironment tEnv = ...
// 获取 tableEnv 的配置对象
Configuration configuration = tEnv.getConfig().getConfiguration();
// 设置参数:
// 开启 miniBatch
configuration.setString("table.exec.mini-batch.enabled", "true");
// 批量输出的间隔时间
configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
// 防止 OOM 设置每个批次最多缓存数据的条数,可以设为 2 万条
configuration.setString("table.exec.mini-batch.size", "20000");
// 开启 LocalGlobal
configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");判断是否生效 观 察 最 终 生 成 的 拓 扑图 的 节 点 名 字 中 是 否包 含 GlobalGroupAggregate 或LocalGroupAggregate。
适用场景 LocalGlobal 适用于提升如 SUM、COUNT、MAX、MIN 和 AVG 等普通聚合的性能,以及解决这些场景下的数据热点问题。
注意事项:
- 需要先开启 MiniBatch
- 开启 LocalGlobal 需要 UDAF 实现 Merge 方法。
开启 Split Distinct (解决 COUNT DISTINCT 热点问题)
LocalGlobal 优化针对普通聚合(例如 SUM、COUNT、MAX、MIN 和 AVG)有较好的效果,对于 COUNT DISTINCT 收效不明显,因为 COUNT DISTINCT 在 Local 聚合时,对于 DISTINCT KEY 的去重率不高,导致在 Global 节点仍然存在热点。之前,为了解决 COUNT DISTINCT 的热点问题,通常需要手动改写为两层聚合(增加按 Distinct Key 取模的打散层)。
从 Flink1.9.0 版本开始,提供了 COUNT DISTINCT 自动打散功能,不需要手动重写。Split Distinct 和 LocalGlobal 的原理对比参见下图。

做两次分组操作。
统计一天的uv
传统做法
SELECT day, COUNT(DISTINCT user_id)
FROM T
GROUP BY day如果手动实现两阶段聚合:
SELECT day, SUM(cnt)
FROM (
SELECT day, COUNT(DISTINCT user_id) as cnt
FROM T
GROUP BY day, MOD(HASH_CODE(user_id), 1024)
)
GROUP BY day- 第一层聚合: 将 Distinct Key 打散求 COUNT DISTINCT。也就是分组,重复的数据先做一个count()
- 第二层聚合: 对打散去重后的数据进行 SUM 汇总。经过第一次后,重复的数据已经count()过了,那么剩下的就是非重复的,在做一次count()即可。
Split Distinct 开启方式
默认不开启,使用参数显式开启:
- table.optimizer.distinct-agg.split.enabled: true,默认 false。
- table.optimizer.distinct-agg.split.bucket-num: Split Distinct 优化在第一层聚合中,被打散的 bucket 数目。默认 1024。
// 初始化 table environment
TableEnvironment tEnv = ...
// 获取 tableEnv 的配置对象
Configuration configuration = tEnv.getConfig().getConfiguration();
// 设置参数:
// 开启 Split Distinct
configuration.setString("table.optimizer.distinct-agg.split.enabled", "true");
// 第一层打散的 bucket 数目
configuration.setString("table.optimizer.distinct-agg.split.bucket-num", "1024");判断是否生效
观察最终生成的拓扑图的节点名中是否包含 Expand 节点,或者原来一层的聚合变成了两层的聚合。
适用场景 使用 COUNT DISTINCT,但无法满足聚合节点性能要求。
注意事项:
- 目前不能在包含 UDAF 的 Flink SQL 中使用 Split Distinct 优化方法。
- 拆分出来的两个 GROUP 聚合还可参与 LocalGlobal 优化。
- 从 Flink1.9.0 版本开始,提供了 COUNT DISTINCT 自动打散功能,不需要手动重写(不用像上面的例子去手动实现)。
为 改写为 AGG WITH FILTER 量 语法(提升大量 COUNT DISTINCT 场景性能)
在某些场景下,可能需要从不同维度来统计 UV,如 Android 中的 UV,iPhone 中的UV,Web 中的 UV 和总 UV,这时,可能会使用如下 CASE WHEN 语法。
SELECT
day,
COUNT(DISTINCT user_id) AS total_uv,
COUNT(DISTINCT CASE WHEN flag IN ('android', 'iphone') THEN user_id ELSE
NULL END) AS app_uv,
COUNT(DISTINCT CASE WHEN flag IN ('wap', 'other') THEN user_id ELSE NULL
END) AS web_uv
FROM T
GROUP BY day在这种情况下,建议使用 FILTER 语法, 目前的 Flink SQL 优化器可以识别同一唯一键上的不同 FILTER 参数。如,在上面的示例中,三个 COUNT DISTINCT 都作用在 user_id列上。此时,经过优化器识别后,Flink 可以只使用一个共享状态实例,而不是三个状态实例,可减少状态的大小和对状态的访问。
将上边的 CASE WHEN 替换成 FILTER 后,如下所示:
SELECT
day,
COUNT(DISTINCT user_id) AS total_uv,
COUNT(DISTINCT user_id) FILTER (WHERE flag IN ('android', 'iphone')) AS app_uv,
COUNT(DISTINCT user_id) FILTER (WHERE flag IN ('wap', 'other')) AS web_uv
FROM T
GROUP BY dayTopN 优化
使用最优
当 TopN 的输出是非更新流(例如 Source),TopN 只有一种算法 AppendRank。当TopN 的输出是更新流时(例如经过了 AGG/JOIN 计算),TopN 有 2 种算法,性能从高到低分别是:UpdateFastRank 和 RetractRank。算法名字会显示在拓扑图的节点名字上。

注意:apache社区版的Flink1.12 目前还没有UnaryUpdateRank,阿里云实时计算版Flink

UpdateFastRank :最优算法
需要具备 2 个条件:
- 输入流有 PK(Primary Key)信息,例如 Group BY AVG。
- 排序字段的更新是单调的,且单调方向与排序方向相反。例如,ORDER BYCOUNT/COUNT_DISTINCT/SUM(正数)DESC。
单调是什么意思,比如count(*)就是单调的,随着计算,这个值在慢慢的变大,不会变小。sum(正数)单调递增。
与排序方向相反,意思是比如我们count(*)值是单调递增的,那么我们排序时候,就要从大到小,要和单调递增的字段相反。
如果要获取到优化 Plan,则您需要在使用 ORDER BY SUM DESC 时,添加 SUM 为正数的过滤条件。
AppendFast:结果只追加,不更新RetractRank:普通算法,性能差
不建议在生产环境使用该算法。请检查输入流是否存在 PK 信息,如果存在,则可进行UpdateFastRank 优化。
无排名优化 ( 解决数据膨胀问题)
TopN 语法:
SELECT *
FROM (
SELECT *,
ROW_NUMBER() OVER ([PARTITION BY col1[, col2..]]
ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
FROM table_name)
WHERE rownum <= N [AND conditions]数据膨胀问题:
根据 TopN 的语法,rownum 字段会作为结果表的主键字段之一写入结果表。但是这可能导致数据膨胀的问题。例如,收到一条原排名 9 的更新数据,更新后排名上升到 1,则从 1 到 9 的数据排名都发生变化了,需要将这些数据作为更新都写入结果表。这样就产生了数据膨胀,导致结果表因为收到了太多的数据而降低更新速度。
使用方式
TopN 的输出结果无需要显示 rownum 值,仅需在最终前端显式时进行 1 次排序,极大地减少输入结果表的数据量。只需要在外层查询中将 rownum 字段裁剪掉即可,如果输出排序列吗,那么就要把表中的数据排好序输出,如果不输出排序列,其他的输出不变,只需要先输出变化的那一条数据。
// 最外层的字段,不写 rownum,加上排序字段降低性能
SELECT col1, col2, col3
FROM (
SELECT col1, col2, col3
ROW_NUMBER() OVER ([PARTITION BY col1[, col2..]]
ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
FROM table_name)
WHERE rownum <= N [AND conditions]在无 rownum 的场景中,对于结果表主键的定义需要特别小心。如果定义有误,会直接导致 TopN 结果的不正确。 无 rownum 场景中,主键应为 TopN 上游 GROUP BY 节点的 KEY 列表。
增加 TopN 的 Cache 大小
TopN 为了提升性能有一个 State Cache 层,Cache 层能提升对 State 的访问效率。
TopN 的 Cache 命中率的计算公式为。
cache_hit = cache_size*parallelism/top_n/partition_key_num例如,Top100 配置缓存 10000 条,并发 50,当 PatitionBy 的 key 维度较大时,例如10 万级别时,Cache 命中率只有 10000*50/100/100000=5%,命中率会很低,导致大量的请求都会击中 State(磁盘),性能会大幅下降。因此当 PartitionKey 维度特别大时,可以适当加大TopN的CacheS ize,相对应的也建议适当加大TopN节点的Heap Memory。
使用方法
// 初始化 table environment
TableEnvironment tEnv = ...
// 获取 tableEnv 的配置对象
Configuration configuration = tEnv.getConfig().getConfiguration();
// 设置参数:
// 默 认 10000 条 , 调 整 TopN cahce 到 20 万 , 那 么 理 论 命 中 率 能 达
200000*50/100/100000 = 100%
configuration.setString("table.exec.topn.cache-size", "200000");PartitionBy 的字段中要有时间类字段
例如每天的排名,要带上 Day 字段。否则 TopN 的结果到最后会由于 State ttl 有错乱。
优化后的 SQL 示例
insert
into print_test
SELECT
cate_id,
seller_id,
stat_date,
pay_ord_amt --不输出 rownum 字段,能减小结果表的输出量(无排名优化)
FROM (
SELECT
*,
ROW_NUMBER () OVER (
PARTITION BY cate_id,
stat_date --注意要有时间字段,否则 state 过期会导致数据错乱(分区字段优化)
ORDER
BY pay_ord_amt DESC --根据上游sum结果排序。排序字段的更新是单调的,
且单调方向与排序方向相反(走最优算法)
) as rownum
FROM (
SELECT
cate_id,
seller_id,
stat_date,
--重点。声明 Sum 的参数都是正数,所以 Sum 的结果是单调递增的,因此 TopN
能使用优化算法,只获取前 100 个数据(走最优算法)
sum (total_fee) filter (
where
total_fee >= 0
) as pay_ord_amt
FROM
random_test
WHERE
total_fee >= 0
GROUP
BY cate_name,
seller_id,
stat_date
) a
WHERE
rownum <= 100
);高效去重方案
第一种去重像日活,或者新增,我们只要第一条数据即可,这也是去重,还有一种去重就是相同数据去重。
由于 SQL 上没有直接支持去重的语法,还要灵活的保留第一条或保留最后一条。因此我们使用了 SQL 的 ROW_NUMBER OVER WINDOW 功能来实现去重语法。去重本质上是一种特殊的 TopN。
保留首行的去重策略(Deduplicate Keep FirstRow )
保留 KEY 下第一条出现的数据,之后出现该 KEY 下的数据会被丢弃掉。因为 STATE 中只存储了 KEY 数据,所以性能较优,示例如下:
SELECT *
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY b ORDER BY proctime) as rowNum
FROM T
)
WHERE rowNum = 1;
//也就是按照b字段分区,然后取每一个分区中的第一条数据,这样状态中只保留一条数据,时间最小的数据,效率高
//如果保留最后一条数据,倒序取出第一条数据以上示例是将 T 表按照 b 字段进行去重,并按照系统时间保留第一条数据。Proctime在这里是源表 T 中的一个具有 Processing Time 属性的字段。如果按照系统时间去重,也可以将 Proctime 字段简化 PROCTIME()函数调用,可以省略 Proctime 字段的声明。
保留末行的去重策略(Deduplicate Keep LastRow )
保留KEY下最后一条出现的数据。保留末行的去重策略性能略优于LAST_VALUE 函数,示例如下:
SELECT *
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY b, d ORDER BY rowtime DESC) as
rowNum
FROM T
)
WHERE rowNum = 1;以上示例是将 T 表按照 b 和 d 字段进行去重,并按照业务时间保留最后一条数据。Rowtime 在这里是源表 T 中的一个具有 Event Time 属性的字段。
系统内置函数
使用内置函数替换自定义函数
Flink 的内置函数在持续的优化当中,请尽量使用内部函数替换自定义函数。使用内置 函数好处:
- 优化数据序列化和反序列化的耗时。
- 新增直接对字节单位进行操作的功能。
LIKE 操作注意事项
- 如果需要进行 StartWith 操作,使用 LIKE 'xxx%'。
- 如果需要进行 EndWith 操作,使用 LIKE '%xxx'。
- 如果需要进行 Contains 操作,使用 LIKE '%xxx%'。
- 如果需要进行 Equals 操作,使用 LIKE 'xxx',等价于 str = 'xxx'。
- 如果需要匹配 _ 字符,请注意要完成转义 LIKE '%seller/id%' ESCAPE '/'。_在 SQL中属于单字符通配符,能匹配任何字符。如果声明为 LIKE '%seller_id%',则不单会匹配 seller_id 还会匹配 seller#id、sellerxid 或 seller1id 等,导致结果错误。
慎用正则函数(REGEXP )
正则表达式是非常耗时的操作,对比加减乘除通常有百倍的性能开销,而且正则表达式在某些极端情况下可能会进入无限循环,导致作业阻塞。建议使用 LIKE。正则函数包括:
- REGEXP
- REGEXP_EXTRACT
- REGEXP_REPLACE
设置参数总结
总结以上的调优参数,代码如下:
// 初始化 table environment
TableEnvironment tEnv = ...
// 获取 tableEnv 的配置对象
Configuration configuration = tEnv.getConfig().getConfiguration();
// 设置参数:
// 开启 miniBatch
configuration.setString("table.exec.mini-batch.enabled", "true");
// 批量输出的间隔时间
configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
// 防止 OOM 设置每个批次最多缓存数据的条数,可以设为 2 万条
configuration.setString("table.exec.mini-batch.size", "20000");
// 开启 LocalGlobal
configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
// 开启 Split Distinct
configuration.setString("table.optimizer.distinct-agg.split.enabled", "true");
// 第一层打散的 bucket 数目
configuration.setString("table.optimizer.distinct-agg.split.bucket-num", "1024");
// TopN 的缓存条数
configuration.setString("table.exec.topn.cache-size", "200000");
// 指定时区
configuration.setString("table.local-time-zone", "Asia/Shanghai");如何聊优化
做过哪些优化或者解决过什么问题,或者遇到什么问题?这些都是在问优化问题。
- 第一步,说明业务场景,在做什么业务,或者执行什么sql的时候。
- 第二步:遇到了什么问题,往往通过监控工具或者报警系统,发现了什么问题。
- 第三步:排查问题,如果是反压,可能是数据量真的很大,之前还没有做过压测,导致现在所有的并行度全部出现了反压,高峰期扛不住,所以这种情况只能通过添加机器来解决。如果某一个并行度或者任务出现反压,那么可能出现了数据倾斜,我们针对数据倾斜,做了什么解决。如果是mr任务,那么可以通过日志查看,有一两个任务由于数据量大执行特别慢,而其他任务已经执行完成。少数任务卡在了99%.在Flink中是通过反压方式报警提醒。
- 第四步:解决问题,说明解决问题,使用了什么方案解决。
贡献者
版权所有
版权归属:codingLab
许可证:bugcode