Flink 常见面试问题
本文最后更新于 2024年1月10日 凌晨
Flink 常见面试问题
Flink 介绍
Flink 是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。并且 Flink 提供了数据分布、容错机制以及资源管理等核心功能。Flink 提供了诸多高抽象层的 API 以便用户编写分布式任务:
DataSet API, 对静态数据进行批处理操作,将静态数据抽象成分布式的数据集,用户可以方便地使用Flink提供的各种操作符对分布式数据集进行处理,支持Java、Scala和Python。
DataStream API,对数据流进行流处理操作,将流式的数据抽象成分布式的数据流,用户可以方便地对分布式数据流进行各种操作,支持Java和Scala。
Table API,对结构化数据进行查询操作,将结构化数据抽象成关系表,并通过类SQL的DSL对关系表进行各种查询操作,支持Java和Scala。
此外,Flink 还针对特定的应用领域提供了领域库,例如: Flink ML,Flink 的机器学习库,提供了机器学习Pipelines API并实现了多种机器学习算法。 Gelly,Flink 的图计算库,提供了图计算的相关API及多种图计算算法实现。
Flink 与 SparkStreaming 的对别
Apache Flink 和 Apache Spark Streaming 是两个流行的分布式数据处理框架,它们都支持实时流式计算,但架构和设计理念存在重要区别。以下是一些关键点来比较 Flink 和 Spark Streaming:
处理模型:
- Spark Streaming:基于微批处理(Micro-Batch)模型,它把数据分成一系列短暂的、小批量的作业来模拟流处理。它通过其 DStream (离散流)抽象提供流式处理。Spark Streaming 的微批量处理可以导致固有的延迟。
- Flink:作为一个真正的流处理框架,Flink 是基于事件的,提供了真正意义上的实时流处理。它处理数据的方式是连续的,每个事件都被单独处理,理论上可以减少延迟,达到接近实时的数据处理。
时间机制:
- Spark Streaming:在较早的版本中主要支持处理时间(Processing Time),它以进入系统的时间作为参考。
- Flink:提供了更多的时间语义支持,包括事件时间(Event Time)、摄取时间(Ingestion Time)和处理时间(Processing Time)。Flink 支持 Watermark 机制,允许处理有界乱序的事件流,这对于正确处理事件时间非常重要。
架构和调度:
- Spark Streaming:由于它是 Spark 的一个组件,它共享 Spark 的资源管理和调度框架。Spark Streaming 应用程序在启动时创建一个 DStreamGraph,然后周期性地生成微批次作业,并提交给集群进行处理。
- Flink:采用自己的调度和部署体系。Flink JobManager 负责作业调度和资源管理,而 TaskManager 负责执行任务。Flink 生成流图(StreamGraph),后续优化为任务图(JobGraph),再进一步转化为执行图(ExecutionGraph)用于具体任务的调度。
容错和一致性:
- Spark Streaming:通过微批处理模型和周期性的 checkpoint 机制支持容错。但是未能提供恰好一次(Exactly-Once)语义,可能会存在至少一次(At-Least-Once)的处理语义,有重复数据处理的可能。
- Flink:默认提供了恰好一次处理语义,使用了两阶段提交协议(如预写日志)以及状态管理来保证状态的一致性和数据的正确性,即便出现失败。
使用场景:
- Spark Streaming:适合对于不是严格实时要求的批处理和批流混合作业。因为是批处理的延伸,Spark Streaming 对于那些已经在使用 Spark 处理批处理任务的用户来说是一个不错的选择。
- Flink:适合对低延迟和高吞吐量有严格要求的场合,如金融领域的实时风控、网络监控等领域。Flink 所提供的事件时间处理和窗口机制特别适合这些对时间敏感的应用。
两个框架随着时间的推移都在不断地发展,Spark 后续引入了结构化流(Structured Streaming),提供了更接近实时处理的支持,并尝试提供低延迟的处理。而 Flink 也一直在提升其易用性和批处理能力。选择哪个框架取决于应用场景、团队经验以及对延迟的要求等多个因素。
系统架构
Apache Flink 是一款开源的流处理框架,能够以很高的吞吐率和低延迟来处理大规模数据流,并且同样支持批处理。Flink 运行时的架构主要包含以下几个核心组件:
Client(客户端): 客户端作为用户交互的接口,用于准备和发送 Flink 作业(Jobs)到集群。作业提交给 JobManager 后,客户端可以选择断开连接(对于流式作业)或者等待作业执行的结果(对于批处理作业)。
JobManager(作业管理器): JobManager 是 Flink 集群的主控节点,负责接收客户端提交的作业,并负责整个作业的调度和协调。当 JobManager 接收到作业时,它会将作业解析成执行图并根据资源状态进行任务调度,此外,JobManager 还负责故障恢复,如重启失败的任务和协调 checkpoint。
TaskManager(任务管理器): TaskManager 是工作节点,用来执行 JobManager 分配下来的任务(Tasks)。每个 TaskManager 拥有一个或多个 Slots,这些 Slots 定义了 TaskManager 可以并行执行的任务数量。TaskManager 负责任务执行的数据处理和状态管理,并要定期向 JobManager 报告心跳和统计信息。
Dispatcher(调度器): 在 Flink 1.5版本之后加入。Dispatcher 运行在 JobManager 中,负责接收来自客户端的 JobGraph,并为每个提交的 JobGraph 启动一个新的 JobManager。
ResourceManager(资源管理器): ResourceManager 负责管理和分配集群中的资源。在与 YARN 或 Mesos 等外部资源管理器集成时,它负责与外部系统协商资源以运行 Flink 作业。
Network(网络组件): Flink 的网络组件基于 Netty 框架实现,负责各个 TaskManager 节点之间的数据传输。数据在节点之间以管道(pipeline)的形式以流的方式进行交换。
Flink采用主从架构,在分布式环境中,可以横向扩展来适应不同的处理需求。通过构成精细化的任务(Task)结构和高效的数据分发与通信机制,Flink可以达到高吞吐的大规模数据处理效果。此外,对于状态管理和容错,Flink提供了轻量级的分布式快照机制来支持精确一次处理的语义。
Flink 的编程模型
Apache Flink 的编程模型是围绕流式处理的抽象概念构建的,并且提供了统一的批处理和流处理的编程方式。在高层次上,Flink 程序结构通常遵循以下模式:
Environment:环境是编写 Flink 程序的起点,它定义了执行程序的上下文。通常,程序开始于创建一个
StreamExecutionEnvironment
实例,用于流式处理,或者是ExecutionEnvironment
,用于批处理。这个环境设置了任务的一些基本参数,比如并行度和检查点策略,并提供了方法来创建初始的数据流。示例代码(流处理环境):
1
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Source:数据源决定了数据进入 Flink 程序的起点,也即是输入的部分。Flink 提供了多种内置源(例如,从文件、数据库、消息队列、Socket 流等读取数据),同时也支持自定义数据源。
示例代码(从文件中读取数据):
1
DataStream<String> source = env.readTextFile("path/to/input");
Transform:转换操作用于对数据进行处理。这可能包括映射(map)、过滤(filter)、聚合(aggregate)、连接(join)、窗口(window)操作等。Flink 提供了丰富的 API 来支持各种转换操作,同时也支持开发者定义自己的复杂转换逻辑。
示例代码(映射操作):
1
2
3
4
5
6
7
8DataStream<Tuple2<String, Integer>> transformed = source
.flatMap((String value, Collector<Tuple2<String, Integer>> out) -> {
String[] words = value.split("\\s");
for (String word : words) {
out.collect(new Tuple2<>(word, 1));
}
})
.returns(Types.TUPLE(Types.STRING, Types.INT));Sink:数据接收器定义了数据流离开 Flink 程序的终点,数据的输出或存储部分。Flink 提供了多种内置的 Sink,如写入文件、数据库等,同时也支持自定义 Sink。
示例代码(把结果写入文件):
1
transformed.writeAsCsv("path/to/output", FileSystem.WriteMode.OVERWRITE);
执行:定义好上述部分之后,通过
env.execute()
方法触发程序执行。这一步会将前面定义的作业提交至 Flink 集群开始执行。示例代码(触发程序执行):
1
env.execute("My Flink Job");
Flink 提供的分层 API 包括如下:
- DataStream API:用于构建流处理程序,其程序模型强调事件的时间属性和流的连续性。
- DataSet API:用于批处理场景,强调静态数据和有限的数据集。
- Table & SQL API:提供类似 SQL 的声明式编程方式,可以透明的处理流和批数据,方便进行数据分析和 ETL 操作。
- ProcessFunction API:为用户提供了一个底层 API,可以访问时间戳、watermarks 以及注册定时事件,允许实现复杂的业务逻辑。
选择 API 的级别通常基于数据处理任务的复杂性和具体需求,更高级别的 API 提供了更简洁快速的开发路径,更底层的 API 则提供了更多的控制和灵活性。在设计 Flink 程序时,选择合适层次的 API 对获取最佳性能和易用性至关重要。
作业执行流程
Flink 的作业执行流程如下,以 Yarn 模式 Per-job 方式为例:
- 代码转化为 JobGraph: 当执行
executor()
时,首先在本地 client 中将代码转化为可以提交的 JobGraph。 - 启动 Application Master (AM): 如果提交为 Per-Job 模式,首先需要启动 AM。Client 向资源系统申请资源,在 Yarn 下即为申请 container 开启 AM。如果是 Session 模式,不需要这个步骤。
- Yarn 分配资源,开启 AM: Yarn 分配资源,开启 AM,该 AM 负责整个作业的调度和管理。
- Job 提交给 Dispatcher: Client 将 Job 提交给 Dispatcher。
- Dispatcher 启动 JobManager (JM)线程: Dispatcher 开启一个新的 JM 线程,负责整个 Job 的执行。
- JM 向 Flink ResourceManager 申请资源: JM 向 Flink 自己的 ResourceManager 申请 slot 资源来执行任务。
- ResourceManager 向 Yarn 申请资源启动 TaskManager (TM): ResourceManager 向 Yarn 申请资源来启动 TaskManager。在 Session 模式下,跳过此步骤。
- Yarn 分配 Container 启动 TM: Yarn 分配 Container 来启动 TaskManager。在 Session 模式下,跳过此步骤。
- ResourceManager 向 TM 申请 slot 资源启动 Task: Flink 的 ResourceManager 向 TaskManager 申请 slot 资源来启动 Task。
- TM 提供 slot 资源给 JM,JM 提交 Task: TaskManager 将待分配的 slot 提供给 JobManager,然后 JobManager 提交 Task。TaskManager 会启动新的线程来执行任务,并通过 shuffle 模块进行 Task 之间的数据交换。
通讯过程
Task 调度
内存模型
Exactly Once 实现
在谈到 Flink 所实现的 exactly-once 语义时,主要包括两个层面:
状态 Exactly-Once:
Flink 提供了 exactly-once 的状态(state)投递语义,这为有状态的计算提供了准确性保证。在这里,“exactly-once”并不意味着 Flink 中的所有事件仅会处理一次,而是指所有事件对生成的状态只有一次影响。在上图中,假设每两条消息后触发一次 checkpoint 操作,持久化一次 state。如果 TaskManager 在处理完事件 c 后被关闭,当 JobManager 重新启动任务时,TaskManager 将从 checkpoint 1 处恢复状态,重新执行流处理。因此,事件 c 确实会被再一次处理。这里的一致性语义是指进入 Flink 系统的事件只会被一次记录到 state 并 checkpoint,而 state 永远不会发生重复消费。这被称为状态 Exactly-Once。
端到端 Exactly-Once:
Flink 1.4 版本引入了重要特性 TwoPhaseCommitSinkFunction,该特性使构建端到端 Exactly-Once 的 Flink 程序成为可能。TwoPhaseCommitSinkFunction 抽取了两阶段提交协议的公共部分,用户需要手动实现 Exactly-Once 语义。为了提供端到端 Exactly-Once 语义,除了 Flink 应用程序本身的状态外,Flink 写入的外部存储也需要满足这个语义。这意味着外部系统必须提供提交或回滚的方法,并通过 Flink 的 checkpoint 来协调。
总体而言,状态 Exactly-Once 保证 Flink 应用程序内部状态的一致性,而端到端 Exactly-Once 则通过 TwoPhaseCommitSinkFunction 和外部系统的协同工作,实现了整个数据流处理任务的 Exactly-Once 语义。
Window 机制
Flink 中窗口的实现机制涉及三个主要组件:
- Window Assigner(窗口分配器):
- 用于决定将元素分配到哪个/哪些窗口中。
- 可能会创建新的窗口,一个元素可以被分配到多个窗口中。
- Trigger(触发器):
- 决定窗口何时进行计算或清除。
- 每个窗口都有自己的触发器,触发器上有定时器,用于决定何时触发窗口计算。
- 触发器的返回结果可以是 continue(不做任何操作)、fire(处理窗口数据)、purge(移除窗口和窗口中的数据)或 fire + purge。
- Evictor(剔除器):
- 在触发器触发之后,在窗口被处理之前,Evictor(如果存在)用于剔除窗口中不需要的元素,相当于一个过滤器。
- 用来遍历窗口中的元素列表,并决定最先进入窗口的多少个元素需要被移除。
窗口的实现流程:
- 数据流源源不断地进入算子(window operator)。
- Window Assigner 决定元素被放到哪个/哪些窗口中。
- 窗口中的元素实际存储在 Key/Value State 中,为了保证窗口的容错性,依赖于 Flink 的 State 机制。
- 当触发器触发时,窗口中的元素交给 Evictor 进行过滤。
- 过滤后的元素被交给用户指定的函数进行窗口的计算。
- 计算函数计算出窗口的结果值,并发送给下游。
对于聚合类的窗口计算,如 sum、min,Flink 进行了优化。聚合类的计算不需要将窗口中的所有数据都保存下来,只需保存一个结果值即可。每个进入窗口的元素执行一次聚合函数并修改结果值。这种优化降低了内存消耗并提升了性能。但是如果用户定义了 Evictor,则不会启用对聚合窗口的优化,因为 Evictor 需要遍历窗口中的所有元素,必须将所有元素都保存下来。
Window 分类
flink 中的窗口主要分为 3 大类共 5 种窗口:
Time Window 时间窗口
- Tumbing Time Window 滚动时间窗口
实现统计每一分钟 (或其他长度)窗口内计算的效果 - Sliding Time Window 滑动时间窗口
实现每过 xxx 时间统计 xxx 时间窗口的效果. 比如,我们可以每 30 秒计算一次最近一分钟用户购买的商品总数。
Count Window 计数窗口
- Tumbing Count Window 滚动计数窗口
当我们想要每 100 个用户购买行为事件统计购买总数,那么每当窗口中填满 100 个元素了,就会对窗口进行计算,这种窗口我们称之为翻滚计数窗口(Tumbling Count Window) - Sliding Count Window 滑动计数窗口
和 Sliding Time Window 含义是类似的,例如计算每 10 个元素计算一次最近 100 个元素的总和
Session Window 会话窗口
在这种用户交互事件流中,我们首先想到的是将事件聚合到会话窗口中(一段用户持续活跃的周期),由非活跃的间隙分隔开。如上图所示,就是需要计算每个用户在活跃期间总共购买的商品数量,如果用户 30 秒没有活动则视为会话断开(假设 raw data stream 是单个用户的购买行为流)
三种时间语义
Event Time: 是事件创建的时间。它通常由事件中的时间戳描述,例如采集的日志数据中,每一条日志都会记录自己的生成时间,Flink 通过时间戳分配器访问事件时间戳。
Ingestion Time: 是数据进入 Flink 的时间。
Processing Time: 是每一个执行基于时间操作的算子的本地系统时间,与机器相关,默认的时间属性就是 Processing Time。
KeyBy 的分区原理
在 Apache Flink 中,keyBy
操作是流处理中必不可少的操作之一,其主要作用是根据指定的键把流数据分区,以便能将具有相同键值的数据项发送到同一任务(task)进行处理。这是实现聚合操作(如 sum
、reduce
等)的前提条件。keyBy
操作的内部实现包含以下几个步骤:
调用 Key 函数:Flink 程序中使用
keyBy
时需要指定一个或多个字段作为键,Flink 会调用这些字段的hashCode
方法来计算每个数据项的哈希值。应用 MurmurHash:计算得到的初步哈希值会被传递给一个哈希函数,Flink 默认使用 MurmurHash 这种高性能、低碰撞概率的非加密哈希函数进行再次哈希处理,以得到一个更加均匀分布的哈希值,这个过程用来降低潜在的哈希碰撞并优化数据的分布。
计算目标分区:得到二次哈希值后,Flink 通过如下公式来确定数据项应该被发送到下游算子的哪个分区:
1 |
|
其中,键组 ID 就是通过 MurmurHash 计算得到的哈希值,最大并行度通常是一个预定义的数值(默认情况下是 128),表示系统可以处理的最大并行任务数。下游算子并行度则是下一阶段操作的并行任务数(例如,一个 reduce
操作可能有多个并行实例)。
通过这个分区方法,Flink 能确保具有相同键的数据项路由到同一个分区,并由同一个下游算子的相同实例处理。这个过程是分布式计算中”分而治之”策略的实际应用,它不仅确保按键的相关操作能有效进行,还可以最大化并行处理的效率,以及提高整体作业的处理吞吐量。
keyBy
的分区原理通过这种策略,使得数据流的分布更加均匀,减少数据倾斜的风险,是 Flink 流处理编程模型中的核心机制之一。
分区、分组的区别
在数据处理和分布式计算领域,分区(Partitioning)和分组(Grouping)是两个重要概念,尽管它们在某些情况下可以互换使用,但它们在语义和用途上有明显区别:
分区(Partitioning)
分区是指在物理层面上,如何将数据切割并分散到多个计算节点上。分区是关于数据存储和处理的位置,与数据处理的并行性密切相关。每个分区可以看作是数据的一个子集,它运行在不同的机器或处理器上,允许并行处理,从而提升性能。在分布式系统中,每个节点通常处理一个或多个分区的数据。
在 Apache Flink 中的分区通常关联到特定算子的并行实例。当一个算子如 map()
或 filter()
配置为并行执行时,它的数据就会被划分到多个分区中,每个分区由独立的任务(task)处理。
分组(Grouping)
分组是逻辑上的划分,它根据数据中的某些属性(比如键)将数据分类。分组通常用在需要按照特定条件收集和处理数据的情况,如聚合操作(sum
、count
、average
等)。相关数据项会根据键值分到相同的组中进行处理,这就允许例如对每个用户的所有购买行为进行汇总。
在分布式数据处理框架(如 Apache Flink 或 Apache Spark)中,执行 groupBy
或 keyBy
操作会根据键值逻辑上将数据编排到不同的组里。然后,分组后的数据可以在相应算子的每个并行实例中进行进一步处理。
相关性
尽管分区和分组概念上不同,但它们之间是有联系的。在流式处理框架中,通过逻辑上的分组定义后,通常会将分组对应的数据发送到特定的物理分区中处理。能够高效地把同一个逻辑分组的数据映射到相同的物理分区,这在 Flink 或 Spark 的有状态操作中尤为重要,因为它可以减少跨节点通信,优化性能。
总结
- 分区反映数据如何物理分散到集群内不同节点的处理单元上,与处理作业的并行性和性能优化紧密关联。
- 分组是数据在逻辑上根据键或某种属性被划分为不同类别,这对聚合和有状态操作很重要。
- 一个分区可以有多个分组,同一个分组的数据肯定在同一个分区。
在设计和实现大规模数据处理系统时,正确认识和利用分区和分组的区别,对提高系统的扩展性和性能至关重要。
State 的存储位置
Apache Flink 内部的 state 可以存储在不同的后端,具体有以下几种实现:
- HeapStateBackend(基于内存的):
- 存储在内存中。
- 主要用于调试模式,不建议在生产环境中使用,因为数据量大时会导致内存压力。
- FsStateBackend(基于 HDFS 的):
- 分布式文件系统(如 HDFS)中进行持久化。
- 每次读写都需要进行网络 IO,性能相对较低,通常不推荐在生产环境中使用。
- RocksDBStateBackend(基于 RocksDB 的):
- 本地文件系统 + 异步 HDFS 持久化。
- 使用 RocksDB 作为存储引擎,可以在本地文件系统上进行快速的读写操作,同时支持异步将数据持久化到 HDFS,提供了更好的性能和可靠性。
- NiagaraStateBackend(基于 Niagara 的,Alibaba 内部实现):
- 分布式持久化,主要在 Alibaba 的生产环境中应用。
- Alibaba 内部自行实现的分布式状态后端,适用于大规模、高并发的生产环境。
选择 state 后端通常取决于具体的使用场景和性能需求。在生产环境中,RocksDBStateBackend 是一个常见的选择,因为它结合了本地文件系统和异步 HDFS 持久化的优势。
反压机制
Flink 实现反压的方式经历了两个阶段:基于 TCP 的反压(版本 1.5 之前)和基于 credit 的反压(版本 1.5及之后)。
基于 TCP 的反压(<1.5 版本)
在这个阶段,Flink 使用 TCP 滑动窗口机制实现反压。具体来说,消息的发送和接收分别通过 ResultPartition(RS)和 InputGate(IC)来完成。数据在本地通过 LocalBufferPool 进行存储和提取,而底层则依赖于 Netty 的 NetworkBufferPool。当 InputGate 端的 buffer 池满了之后,两个任务之间的滑动窗口大小变为 0,这意味着 ResultPartition 端无法再发送数据。
然而,这种基于 TCP 的反压机制存在一个问题,即它会导致整个 TaskManager 的反压,因为所有任务都受到影响。
基于 Credit 的反压(>1.5 版本)
为了解决基于 TCP 反压的问题,Flink 在 1.5 版本之后引入了基于 credit 的反压机制。在这个阶段,RS 和 IC 之间通过 backlog 和 credit 来提前感知双方可以发送和接受的数据量的大小。这样,可以更灵活地控制反压,而不是通过 TCP 滑动窗口。
具体来说,credit 机制允许 Receiver(IC)通知 Sender(RS)它能够接收的数据量,从而在系统中实现动态的反压控制。这种方式更为灵活和精细化,能够减小不同任务之间的影响范围,提高系统的整体性能。
部署模式
Flink 支持多种部署模式,其中一些主要的包括:
- Standalone 模式: 这是最简单的模式,适用于单机或本地开发环境。在这种模式下,Flink 会启动一个 Standalone 集群,包括 JobManager 和 TaskManager。
- YARN 模式: 在 YARN 模式下,Flink 可以在 Hadoop 集群上运行。YARN(Yet Another Resource Negotiator)是 Hadoop 的资源管理器,可以用于分布式计算。
- Mesos 模式: Mesos 是一个开源的集群管理系统,Flink 可以在 Mesos 上运行作业。Mesos 提供了对集群资源的细粒度控制。
- Kubernetes 模式: Kubernetes 是一个开源的容器编排系统,Flink 可以在 Kubernetes 上以容器化的方式运行。这种模式提供了更灵活的资源管理和部署选项。
- Docker 模式: Flink 可以作为 Docker 容器运行,这样可以更轻松地在不同环境中部署和运行。
- AWS 模式: Flink 提供了在 Amazon Web Services(AWS)上运行的支持,可以方便地与 AWS 的服务集成。
- Google Compute Engine 模式: Flink 也支持在 Google Cloud Platform 的 Compute Engine 上运行。
- MAPR 模式: Flink 可以与 MapR 集成,利用 MapR 作为底层存储和计算引擎。
这些部署模式可以根据具体的需求和环境选择,例如在云服务上使用 AWS 或 Google Compute Engine,或者在本地集群上使用 Standalone 或者 YARN。
Flink on Yarn 的部署
Flink 作业提交有两种类型:
yarn session
需要先启动集群,然后在提交作业,接着会向 yarn 申请一块空间后,资源永远保持不变。如果资源满了,下一个作业就无法提交,只能等到 yarn 中的其中一个作业执行完成后,释放了资源,那下一个作业才会正常提交.- 客户端模式
对于客户端模式而言,你可以启动多个 yarn session,一个 yarn session 模式对应一个 JobManager, 并按照需求提交作业,同一个 Session 中可以提交多个 Flink 作业。如果想要停止 Flink Yarn Application,需要通过 yarn application -kill 命令来停止. - 分离式模式
对于分离式模式,并不像客户端那样可以启动多个 yarn session,如果启动多个,会出现下面的 session 一直处在等待状态。JobManager 的个数只能是一个,同一个 Session 中可以提交多个 Flink 作业。如果想要停止 Flink Yarn Application,需要通过 yarn application -kill 命令来停止
- 客户端模式
Flink run (Per-Job)
直接在 YARN 上提交运行 Flink 作业 (Run a Flink job on YARN),这种方式的好处是一个任务会对应一个 job, 即没提交一个作业会根据自身的情况,向 yarn 申请资源,直到作业执行完成,并不会影响下一个作业的正常运行,除非是 yarn 上面没有任何资源的情况下
EventTime 和 ProcessTime 的区别
在 Flink 中,有三种时间概念:Processing Time、Event Time 和 Ingestion Time。这三者之间的区别主要在于时间的来源和处理方式。
1. Processing Time(处理时间)
- 定义: 指事件被处理时机器的系统时间。
- 特点: 所有基于时间的操作,如时间窗口,都使用当时机器的系统时间。
- 优点: 实时性强,不受事件的乱序和延迟影响。
- 缺点: 不适用于需要考虑事件的实际发生时间的场景,容易受到机器时间的波动影响。
2. Event Time(事件时间)
- 定义: 事件发生的实际时间,通常是数据本身携带的时间。
- 特点: 事件时间是在事件到达 Flink 之前就确定的,每个事件都携带有事件时间戳。
- 优点: 适用于需要考虑事件实际发生时间的场景,不受机器时间波动和乱序的影响。
- 缺点: 需要指定如何生成 Event Time 水印,处理乱序和延迟较为复杂。
3. Ingestion Time(摄入时间)
- 定义: 事件进入 Flink 系统的时间。
- 特点: 在源操作处,每个事件将源的当前时间作为时间戳,基于时间的操作使用这个时间戳。
- 优点: 在某种程度上折中了 Processing Time 和 Event Time 的优缺点,更可预测,不容易受到机器时间波动的影响。
- 缺点: 无法处理无序事件或延迟数据,程序不必指定如何生成水印。
选择使用哪种时间取决于具体的业务场景和需求。如果需要考虑事件的实际发生时间并处理乱序和延迟,那么 Event Time 是更为合适的选择。如果对实时性要求较高,可以选择 Processing Time。而 Ingestion Time 在某些情况下可以作为一个折中方案。
状态编程(Stateful Programming)
在状态编程中,状态指的是在处理数据流过程中保存和更新的信息。这可能包括计数器、聚合结果、连接操作中的缓冲数据等。有状态算子能够在事件到来时,基于其当前状态和这些事件的数据进行计算。
状态编程在流处理中尤为重要,因为流数据是无界的,必须在算子中保持中间结果以便随时处理新数据。
状态编程主要有两种类型的状态:
算子状态(Operator State):
- 算子状态是任务级别的状态,它由一个算子的所有并行实例所共享。
- 对于算子的每个并行实例,都有一个独立的算子状态版本。
- 例如,在分布式快照(checkpointing)中,一个算子的状态可以被用来恢复到特定的处理点。
键控状态(Keyed State):
- 键控状态是分区的数据流(keyed streams)所专有的,由每个键一个状态实例。
- 用于需要根据数据流中的键来保存状态的场景,例如,保持每个用户的点击次数。
- 每个键控状态都只能被相同键的数据访问和修改。
状态机制(State Backends)
状态后端(State Backends)决定了状态数据的存储和维护方法。它包括状态的两个方面:本地状态的存储和检查点(Checkpoint)的存储。
本地状态存储:
- 对于正在处理的状态数据,它存储在内存(如 TaskManager 的 JVM 堆内存)或持久化存储如 RocksDB 中。内存存储快速但不可靠,而使用 RocksDB 等外部存储则可提供更好的容错特性。
检查点存储:
- 用于存储状态的快照的外部系统,允许在发生故障时恢复状态。
- 例如,Flink 支持存储检查点到远程持久化存储系统(如 HDFS)。
现在来看状态后端的几种配置:
内存状态后端:
- 本地状态保存在 TaskManager 的内存中。
- 检查点保存在 JobManager 的内存中。
- 适用于小状态量的作业,因为它依赖于内存大小。
文件系统状态后端:
- 本地状态保存在 TaskManager 的内存中。
- 检查点保存在一个配置的文件系统(如 HDFS)上。
- 提供容错能力,可以恢复大状态量的作业。
RocksDB 状态后端:
- 本地状态存储在本地的 RocksDB 实例中(通常配置在本地磁盘上)。
- 检查点也保存在配置的文件系统上。
- 适合有大量状态需要维护的场景。由于 RocksDB 是存储在本地磁盘的,它支持非常大的状态大小,比内存后端的限制要大得多。
状态编程和状态后端在分布式系统的正确性(确保数据不丢失)和效率(高吞吐和低延迟)中扮演着关键角色,工程师需要根据具体的应用场景和性能要求,选择合适的状态管理策略。
Interval Join 的实现原理
底层调用的是 keyby+connect ,处理逻辑:
- 判断是否迟到(迟到就不处理了)
- 每条流都存了一个 Map 类型的状态(key 是时间戳,value 是 List 存数据)
- 任一条流,来了一条数据,遍历对方的 map 状态,能匹配上就发往 join 方法
- 超过有效时间范围,会删除对应Map中的数据(不是clear,是remove)
Interval join不会处理join不上的数据,如果需要没join上的数据,可以用 coGroup+connect算子实现,或者直接使用flinksql里的left join或right join语法。
Session Window 的使用
在 Apache Flink 中,Session Windows 用于处理有活动会话或随时间不连续出现的数据。Session Window 的核心是 Session Gap(会话间隔),这是一段定义好的时间长度,在此时间内没有接收到新数据,就会认为当前的会话窗口结束,进而触发窗口的计算。这种窗口类型适合需要根据非连续性数据或周期性数据进行分析的场景。(例如:用户行为分析)
使用Flink里的Session Windows进行数据流处理的示例如下:
1 |
|
以下是Session Window的一些要点:
- 动态窗口边界:Session Windows没有固定的窗口大小,窗口的起止是由数据的实际出现间断来决定的。
- 会话间隔(Session Gap):如果在Session Gap期间没有接收到属于同一会话的数据,则当前窗口会结束。
- 与其他窗口的不同:与 Tumbling Window(滚动窗口)和 Sliding Window(滑动窗口)相比,Session Window 的窗口期是动态的,取决于数据的实际到达模式。
- 窗口合并:对于每一个事件,Session Window机制都会创建一个新的窗口。如果由于新的事件而导致已存在的窗口间隔小于或等于Session Gap,这些窗口将会被合并。
Session Windows通过这种动态的窗口划分,可以灵活地进行基于活跃会话期间的事件处理,适用于那些对会话持续时间敏感的应用场合。
Watermark 机制
Watermark 是一条携带时间戳的特殊数据,从代码指定生成的位置,插入到流里面。
一对多:广播
多对一:取最小
多对多:拆分来看,其实就是上面两种的结合
- Watermark 是一种衡量 Event Time 进展的机制,可以设定延迟触发
- Watermark 是用于处理乱序事件的,而正确的处理乱序事件,通常用 Watermark 机制结合 window 来实现;
- 基于事件时间,用来触发窗口、定时器等
- watermark 主要属性就是时间戳,可以理解一个特殊的数据,插入到流里面
- watermark 是单调不减的
- 数据流中的 Watermark 用于表示 timestamp 小于 Watermark 的数据,都已经到达了,如果后续还有 timestamp 小于 Watermark 的数据到达,称为迟到数据
生成方式
间歇性:来一条数据,更新一次 watermark
周期性:固定周期更新watermark
官方提供的api是基于周期的,默认200ms,因为间歇性会给系统带来压力。
Watermark=当前最大事件时间-乱序时间-1ms
分布式快照
Flink 的容错机制的核心部分是制作分布式数据流和操作算子状态的一致性快照。这些快照充当一致性 checkpoint,系统可以在发生故障时回滚。 Flink 用于制作这些快照的机制在“分布式数据流的轻量级异步快照”中进行了描述。它受到分布式快照的标准 Chandy-Lamport 算法的启发,专门针对 Flink 的执行模型而定制。
barriers 在数据流源处被注入并行数据流中。快照 n 的 barriers 被插入的位置(我们称之为 Sn)是快照所包含的数据在数据源中最大位置。
例如,在Apache Kafka中,此位置将是分区中最后一条记录的偏移量。 将该位置Sn报告给checkpoint协调器(Flink的JobManager)。
然后barriers向下游流动。当一个中间操作算子从其所有输入流中收到快照n的barriers时,它会为快照n发出barriers进入其所有输出流中。
一旦sink操作算子(流式DAG的末端)从其所有输入流接收到barriers n,它就向checkpoint协调器确认快照n完成。
在所有sink确认快照后,意味快照着已完成。一旦完成快照n,job将永远不再向数据源请求Sn之前的记录,因为此时这些记录(及其后续记录)将已经通过整个数据流拓扑,也即是已经被处理结束。
CEP 机制
CEP 全称为 Complex Event Processing,复杂事件处理
Flink CEP是在 Flink 中实现的复杂事件处理(CEP)库
CEP 允许在无休止的事件流中检测事件模式,让我们有机会掌握数据中重要的部分
一个或多个由简单事件构成的事件流通过一定的规则匹配,然后输出用户想得到的数据 —— 满足规则的复杂事件
在流式处理中,CEP 当然是要支持 EventTime 的,那么相对应的也要支持数据的迟到现象,也就是watermark的处理逻辑。CEP对未匹配成功的事件序列的处理,和迟到数据是类似的。在 Flink CEP的处理逻辑中,状态没有满足的和迟到的数据,都会存储在一个Map数据结构中,也就是说,如果我们限定判断事件序列的时长为5分钟,那么内存中就会存储5分钟的数据,这在我看来,也是对内存的极大损伤之一。
Flink SQL 的工作机制
通过 Calcite 对编写的 Sql 进行解析、验证、优化等操作。
Blink Planner 与 Calcite 进行对接,对接流程如下:
- 在 Table/SQL 编写完成后,通过 Calcite 中的 parse、validate、rel 阶段,以及 Blink 额外添加的 convert 阶段, 将其先转为 Operation;
- 通过 Blink Planner 的 translateToRel、optimize、translateToExecNodeGraph 和 translateToPlan 四个阶段,将 Operation 转换成 DataStream API 的 Transformation;
- 再经过StreamJraph -> JobGraph -> ExecutionGraph等一系列流程,SQL最终被提交到集群。
SQL 优化
SQL 优化会使用两个优化器:RBO(基于规则的优化器) 和 CBO(基于代价的优化器)
- RBO(基于规则的优化器)会将原有表达式裁剪掉,遍历一系列规则(Rule),只要满足条件就转换,生成最终的执行计划。一些常见的规则包括分区裁剪(Partition Prune)、列裁剪、谓词下推(Predicate Pushdown)、投影下推(Projection Pushdown)、聚合下推、limit 下推、sort 下推、常量折叠(Constant Folding)、子查询内联转 join 等。
- CBO (基于代价的优化器)会将原有表达式保留,基于统计信息和代价模型,尝试探索生成等价关系表达式,最终取代价最小的执行计划。CBO 的实现有两种模型,Volcano 模型,Cascades 模型。这两种模型思想很是相似,不同点在于 Cascades 模型一边遍历 SQL 逻辑树,一边优化,从而进一步裁剪掉一些执行计划。