Hadoop 常见面试问题
本文最后更新于 2024年1月7日 凌晨
Hadoop 常见面试问题
基础
ETL 介绍
Extraction-Transformation-Loading 的缩写,中文名称为数据提取、转换和加载。
Hadoop 介绍
Hadoop 是一个分布式系统基础架构,主要是为了解决海量数据的存储和分析计算的问题。旨在通过将数据分布在集群中的多个计算机上进行并行处理来解决大数据处理的问题。
Hadoop 主要组件
Hadoop 的组成在不同版本中有所变化。以下是关于 Hadoop 1.X、Hadoop 2.X 和 Hadoop 3.X 版本的组成的概述:
Hadoop 1.X 版本的组成:
- Hadoop Common: 提供基本功能和工具,如文件系统、I/O 操作和网络通信。
- Hadoop Distributed File System (HDFS): 用于存储大规模数据集的分布式文件系统。
- MapReduce: 分布式计算框架,用于在 Hadoop 集群上并行处理数据。
Hadoop 2.X 版本的组成:
- Hadoop Common: 提供基础功能和工具。
- Hadoop Distributed File System (HDFS): 用于存储大规模数据集的分布式文件系统。
- YARN (Yet Another Resource Negotiator): 用于资源管理和作业调度的集群资源管理器。YARN 将 MapReduce 从 Hadoop Common 中分离出来,成为一个独立的组件,使 Hadoop 集群能够运行多个分布式计算框架。
- MapReduce: 在 YARN 上运行的分布式计算框架。
Hadoop 3.X 版本的组成:
- Hadoop Common: 提供基础功能和工具。
- Hadoop Distributed File System (HDFS): 用于存储大规模数据集的分布式文件系统。
- YARN (Yet Another Resource Negotiator): 用于资源管理和作业调度的集群资源管理器。
- MapReduce: 在 YARN 上运行的分布式计算框架。
- Hadoop 3.X 引入了新的组件和功能,如:
- Erasure Coding(纠删码): 代替传统的副本机制,提供更高的存储效率。
- Containerization(容器化)支持: 可以使用 Docker 等容器技术来运行和管理 Hadoop 任务。
- 结合 YARN 的改进,Hadoop 3.X 支持更多的分布式计算框架,如 Apache Spark、Apache Flink 等。
Hadoop 主要进程
在启动 Hadoop 集群时,会启动以下进程,每个进程有着不同的角色和作用:
- NameNode:
- 作用: 是 Hadoop 分布式文件系统(HDFS)的主节点,负责维护文件系统的元数据,包括文件的层次结构、文件块的位置以及访问权限等。管理文件系统的命名空间,并协调数据块在集群中的复制、恢复和故障转移。
- SecondaryNameNode (非 HA 模式):
- 作用: 实现 NameNode 的容错机制。定期合并编辑日志与命名空间镜像,当 NameNode 挂掉时,可通过一定步骤进行上顶。值得注意的是,SecondaryNameNode 并不是 NameNode 的备用节点。
- DataNode:
- 作用: 是 HDFS 的从节点,负责存储实际的数据块。DataNode 在本地磁盘上保存数据块,并与 NameNode 通信,报告它们所保存的数据块副本的状态。执行数据块的读取、写入和删除操作。
- ResourceManager:
- 作用: 是 Hadoop YARN 的核心组件,负责集群资源的分配和管理。接收来自客户端和应用程序的资源请求,并将可用的集群资源分配给相应的应用程序。监控和调度各个应用程序的任务,确保它们按照预定的规则执行。
- NodeManager:
- 作用: 是 YARN 的每个节点上的服务,负责管理和监控该节点上的容器(Container)。容器是 YARN 中运行应用程序的最小单元,每个容器包含一个或多个任务。NodeManager 负责启动、监控和终止容器,并向 ResourceManager 报告资源使用情况。
- JournalNode (HA 下启用):
- 作用: 在高可用模式下,存放 NameNode 的 editlog 文件,用于保障在主 NameNode 发生故障时能够快速切换到备用 NameNode。
Hadoop 的常用配置文件
Hadoop2 | core-site. xml | hdfs-site. xml | mapred-site. xml | yarn-site. xml | slaves |
---|---|---|---|---|---|
Hadoop3 | core-site. xmll | hdfs-site. xmll | mapred-site. xmll | yarn-site. xml | workers |
hadoop-env. Sh:
作用: 用于定义 Hadoop 运行环境相关的配置信息,如配置
JAVA_HOME
环境变量、为 Hadoop 的 JVM 指定特定选项、指定日志文件路径、以及master
和slave
文件的位置等。配置示例:
1
2export JAVA_HOME=/path/to/java
export HADOOP_OPTS="-Djava.net.preferIPv4Stack=true"
core-site. Xml:
作用: 用于定义系统级别的参数,包括 HDFS URL、Hadoop 的临时目录、rack-aware 集群中的配置文件位置等。其中的参数定义会覆盖
core-default.xml
文件中的默认配置。配置示例:
1
2
3
4
5
6
7
8
9
10
11
12<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://localhost:9000</value>
</property>
<!--ID: 1701142137188-->
<!-- 其他配置项 -->
</configuration>
hdfs-site. Xml:
作用: 用于配置 HDFS 的相关参数,包括文件副本个数、块大小、是否使用强制权限等。其中的参数定义会覆盖
hdfs-default.xml
文件中的默认配置。配置示例:
1
2
3
4
5
6
7<configuration>
<property>
<name>dfs.replication</name>
<value>3</value>
</property>
<!-- 其他配置项 -->
</configuration>
mapred-site. Xml:
作用: 用于配置 MapReduce 相关的参数,包括 Reduce 任务的默认个数、任务可使用内存的默认上下限等。其中的参数定义会覆盖
mapred-default.xml
文件中的默认配置。配置示例:
1
2
3
4
5
6
7<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
<!-- 其他配置项 -->
</configuration>
Hadoop 常用端口号
Hadoop2 | Hadoop3 | |
---|---|---|
访问HDFS端口 | 50070 | 9870 |
访问MR执行情况端口 | 8088 | 8088 |
历史服务器 | 19888 | 19888 |
客户端访问集群端口 | 9000 | 8020 |
Hadoop 小文件问题
危害: 在 Hadoop 中,大量小文件可能对系统造成多方面的危害,主要表现在以下几个方面:
- 存储空间浪费: 每个小文件都占用磁盘存储空间,而且每个文件都需要额外的元数据记录。当小文件数量庞大时,会浪费大量存储资源。
- 元数据开销: Hadoop 的元数据存储在 NameNode 内存中,每个小文件都对应一条元数据记录。当小文件数量增多时,NameNode 需要维护大量元数据记录,可能导致内存压力增加,甚至引起性能问题。
- 系统性能下降: 对大量小文件进行操作时,文件系统需要频繁进行文件定位和寻址操作,可能导致较低的 I/O 性能。同时,会产生碎片化的读写操作,影响整体系统性能。
避免: 为了规避上述问题,可以采取以下解决方法:
- 文件合并: 将小文件合并成较大的文件,以减少文件数量。可以通过脚本或 MapReduce 作业实现文件合并。
- 压缩合并: 对小文件进行合并压缩,将它们存储在一个压缩文件中,以减少存储空间和元数据开销。
- 使用特定文件格式: 利用 Hadoop 提供的文件格式如 SequenceFile 和 Hadoop Archives(HAR),将小文件组合成大文件进行存储,降低元数据开销。
- 分区存储: 根据业务逻辑,将小文件按一定规则进行分区存储,以减小问题的影响范围。
- 调整块大小和副本数: 针对小文件的特性,适度调整 Hadoop 的块大小和副本数,以提高存储空间利用率,但需根据实际情况进行权衡。
Hadoop 的运行模式
Hadoop 支持三种运行模式,分别是单机版、伪分布式模式和完全分布式模式。
- 单机版(Standalone):
- 描述: 单机版是 Hadoop 的最简单运行模式,适用于在单个节点上进行开发和测试。它不涉及分布式环境,所有 Hadoop 组件都运行在同一台机器上。
- 用途: 主要用于小规模数据的本地开发和调试。
- 伪分布式模式(Pseudo-Distributed):
- 描述: 伪分布式模式是在单个节点上模拟分布式环境的运行模式。不同的 Hadoop 组件运行在同一节点的不同进程中,模拟了分布式环境的功能和行为。
- 用途: 适用于在本地机器上模拟分布式集群,进行规模较小的测试和开发。
- 完全分布式模式(Fully-Distributed):
- 描述: 完全分布式模式是 Hadoop 在真实的分布式集群中运行的模式。各个组件分布在不同的机器上,形成一个真正的分布式环境。
- 用途: 用于处理大规模数据集,实现数据存储和计算的分布式处理。适用于生产环境的大数据处理。
Hadoop 1.X 和 Hadoop 2.X 的主要区别
块大小的改变:
Hadoop 1. X: 默认块大小为 64 MB。
Hadoop 2. X: 默认块大小为 128 MB。
资源调度方式的改变:
Hadoop 1.X: 使用 JobTracker 负责任务调度和资源管理,存在单点故障风险,而且任务调度和资源管理全部由 JobTracker 完成,导致单点负担过重。
Hadoop 2.X: 引入了 YARN(Yet Another Resource Negotiator),将任务调度和资源管理分离。ResourceManager负责资源管理,而ApplicationMaster作为任务的管理者单独运行在一个容器中,有效减轻了单点压力。
引入 HA 模式:
Hadoop 1.X: 没有提供官方的高可用性(HA)支持,集群中只有一个 NameNode,存在单点故障风险。
Hadoop 2.X: 引入了 HA 模式,支持多个 NameNode,包括一个 Active NameNode 和一个 Standby NameNode,提高了系统的可用性。
引入 HDFS Federation:
Hadoop 2.X: 引入了 HDFS Federation,通过支持 ZooKeeper 实现了可靠的高可用性。这使得 NameNode 可以横向扩展为多个,每个 NameNode 管理一部分目录,增强了 HDFS 的扩展性和隔离性。
Hadoop 1.X 的缺点:
- 单点故障: JobTracker 存在单点故障的风险,一旦发生故障,整个任务调度和资源管理系统会受到影响。
- 资源管理不均衡: 任务调度和资源管理完全由 JobTracker 完成,导致单点负担过重,容易造成资源管理的不均衡。
- 简单的资源表示: TaskTracker 以 Map/Reduce 数量表示资源,这种简化的表示方式可能不够灵活,无法满足复杂任务的需求。
- Map Slot 和 Reduce Slot 划分不合理: TaskTracker 分 Map Slot 和 Reduce Slot,如果任务只需要 map 任务可能会造成资源浪费。这种划分不够灵活,不能有效地适应各种任务的特性。
HDFS
HDFS 写入文件流程
在 Hadoop HDFS(Hadoop Distributed File System)中,客户端写入文件的过程是一个涉及客户端、NameNode 和 DataNodes 的多步骤协调操作,确保数据在分布式环境中安全可靠地存储。下面是这个过程的详细步骤:
- 初始化文件写入:
- 客户端通过调用 DistributedFileSystem 的
create
方法初始化文件写入过程,它会创建一个文件输出流(FSDataOutputStream)。 - FSDataOutputStream 中包含一个内部队列(DataQueue)和一个背景写入的 DataStreamer 线程。
- 客户端通过调用 DistributedFileSystem 的
- NameNode 交互创建文件:
- DistributedFileSystem 进行一次 RPC(远程过程调用)到 NameNode,请求创建一个新文件。如果文件不存在并且客户端有权限写入,NameNode 将创建一条新的文件条目。
- 初始时,文件没有与之关联的 Block。
- Block 分配:
- 当客户端开始将数据写入 FSDataOutputStream 时,事实上数据首先被写入到 DataQueue。
- DataStreamer 线程周期性地向 NameNode 请求新的数据块(Blocks)分配和 DataNodes 的列表,用于后续存放数据的副本。
- 数据流管道写入:
- DataStreamer 根据从 NameNode 获得的 DataNodes 列表,建立起从客户端到第一个 DataNode,再到第二个 DataNode,然后是第三个 DataNode 的数据流管道(Pipeline)。
- 每个 Block 的数据首先被写到第一个 DataNode,然后流式传输到下一个 DataNode,形成一个副本链。
- 块写入确认:
- 每个 DataNode 接收到数据块后,会存储块数据,并转发到下一个 DataNode。
- 一旦所有 DataNodes 都接收到数据块,并成功存储,它们将发送确认(ACK)回到前一个节点,最终回到客户端。
- 关闭并确认写入:
- 当客户端完成数据写入后,调用 FSDataOutputStream 的
close
方法进行关闭操作。 - 底层,这会触发 DataStreamer 完成最后块的传输并等待 ACK,确保所有数据块的写入和复制是完成并确认的。
- 当客户端完成数据写入后,调用 FSDataOutputStream 的
- 完成写入:
- 最后客户端调用 DistributedFileSystem 的
complete
方法,这将通知 NameNode 文件写入完毕。 - NameNode 记录这些改动到它的持久化存储(EditLog),以确保文件系统的状态是最新的。
- 最后客户端调用 DistributedFileSystem 的
补充信息:
- 数据写入过程中,HDFS 默认创建三个副本以增加数据的可靠性,尽管副本数量可以根据文件系统的配置进行调整。
- HDFS 允许客户端指定副本的放置策略,利用机架感知以优化数据稳定性和读取性能。
- 如果在数据写入过程中发生任何 DataNode 故障,DataStreamer 将负责重新构建数据流管道,同时继续数据写入而不中断客户端。
- 当 DataNode 突然挂掉了,客户端接收不到这个 DataNode 发送的 ack 确认,客户端会通知 NameNode,NameNode 检查该块的副本与规定的不符,NameNode 会通知 DataNode 去复制副本,并将挂掉的 DataNode 作下线处理,不再让它参与文件上传与下载。
- NameNode 的作用是中心协调者,但并不直接参与数据传输,以确保其高性能和稳定性。
- 整个写入流程涵盖了 HDFS 的容错和自恢复机制,确保系统即使在某些组件出现故障时也能保证数据的完整性和可用性。
HDFS 读取文件流程
在 Hadoop HDFS(Hadoop Distributed File System)中,客户端读取数据的过程是一个涉及客户端、NameNode 和多个 DataNodes 的操作。此流程确保客户端能够有效地从分布式环境中检索文件,以下是详细步骤:
- 初始化读取请求:
- 客户端通过调用 DistributedFileSystem 的方法发起读取请求。这个过程涉及到建立 RPC(远程过程调用)与 NameNode 的连接。
- NameNode 查找 Block 信息:
- 客户端请求特定文件的 Block 信息,NameNode 对接收到的请求执行查找操作。
- 如果客户端有权限读取文件,NameNode 将返回包含所有 Block 及其位置的列表,每个 Block 的信息包括存储该 Block 副本的所有 DataNodes 的地址。
- 选择最近的 DataNode:
- 客户端将从 NameNode 返回的 DataNodes 列表中选择最近的一个(通常基于网络距离或机架位置)来读取数据。选择最近的 DataNode 可以减少网络传输延迟,并提高读取效率。
- 并行读取数据:
- 客户端直接与这个选择的 DataNode 建立连接,并开始读取块数据。如果文件比较大,可能涉及并行地从多个 DataNodes 读取多个 Blocks。
- 组装数据成文件:
- 客户端在本地组装和处理从 DataNodes 接收到的 Block 数据,最终构成原始文件的内容。
补充信息:
- 客户端与 DataNode 直接交互传输数据,在整个过程中 NameNode 不直接参与数据传输,它仅仅作为元数据管理者提供数据块的位置信息。
- 如果客户端在读取数据时遇到某个 DataNode 故障或不可访问,客户端将重新选择另一个 DataNode 继续读取同一 Block 的副本。
- HDFS 在读取文件的时候,如果其中一个块突然损坏了,也就是把客户端读取到本地的块与 HDFS 上的原始块进行校验,如果发现校验结果不一致,客户端会通知 NameNode,然后再从下一个拥有该 block 副本的 DataNode 继续读。
- HDFS 也支持从多个 DataNode 并行读取不同 Block 的备份,这进一步提高了读取大文件的性能。
- 在读取数据的过程中,HDFS 的客户端可能会采用一些优化策略如预读取(readahead)和缓存,以减少读取延迟,并进一步提升性能。
- 类似写流程,读流程在 HDFS 中也是高度优化的,它利用了分布式和冗余存储架构的优势,确保数据可靠性和高效读取。
- 读取期间,HDFS 客户端实现能够自动适应 DataNode 的变化,例如对于新增或移除的 DataNodes,客户端会无缝地更新请求策略,保证数据访问不受影响。
HDFS 小文件优化方法
HDFS 小文件弊端:
在存储层面,HDFS 上每个文件都要在 namenode 上建立一个索引,这个索引的大小约为 150 byte,这样当小文件比较多的时候,就会产生很多的索引文件,一方面会大量占用 namenode 的内存空间,另一方面就是索引文件过大是的索引速度变慢。此外在计算层面每个小文件都会起到一个 MapTask,1个 MapTask 默认内存1G。小文件的处理时间甚至远小于 MapTask 或容器的初始化,会造成大量时间与资源浪费。
解决方案:
- Hadoop Archive: 是一个高效地将小文件放入 HDFS 块中的文件存档工具,它能够将多个小文件打包成一个 HAR 文件,这样在减少 namenode 内存使用的同时。
- Sequence file: Sequence file 由一系列的二进制 key/value 组成,如果为 key 小文件名,value 为文件内容,则可以将大批小文件合并成一个大文件。
- CombineFileInputFormat: CombineFileInputFormat 是一种新的 inputformat,用于将多个文件合并成一个单独的 split,另外,它会考虑数据的存储位置。
- JVM 重用: 有小文件场景开启 JVM 重用。(如果没有小文件,不要开启 JVM 重用,因为会一直占用使用到的 task 卡槽,直到任务完成才释放。)
HDFS 的主要组件
HDFS 采用的是 Master/Slave 架构,一个 HDFS 集群包含一个单独的 NameNode 和多个 DataNode 节点。
NameNode:
- 职责: NameNode负责管理整个分布式系统的元数据,包括目录树结构、文件到数据库Block的映射关系、Block副本及其存储位置等。
- 管理数据: DataNode的状态监控通过心跳传递管理信息和数据信息,NameNode可以获知每个DataNode保存的Block信息、DataNode的健康状况等。
- 元数据管理: NameNode在内存中保存目录结构等元数据,同时在磁盘上保存fsimage和editlog文件,用于恢复元数据和记录操作日志。
fsimage
:内存命名空间元数据在外存的镜像文件。editlog
:各种元数据操作的write-ahead-log文件,用于防止数据丢失。
- 故障处理: 如果发现某个DataNode节点故障,NameNode会将其负责的Block在其他DataNode上进行备份。
Secondary NameNode: - 作用: Secondary NameNode并非NameNode的热备机,而是定期从NameNode拉取fsimage和editlog文件,并对两个文件进行合并,形成新的fsimage文件并传回NameNode,从而减轻NameNode的工作压力,提供检查点功能服务。
DataNode: - 职责: DataNode负责实际数据块的存储和读写工作。
- 数据块: Block 默认大小为 128 MB(HDFS 1.x 为 64 MB)。对于大文件,HDFS 会自动将其切割成固定大小的 Block。
- 备份: 为了保证数据可用性,每个Block默认存储3份,即默认的复制因子为3。
Secondary NameNode 的工作机制
Secondary NameNode 主要通过以下两个阶段来工作:
第一阶段:NameNode 启动
- 第一次启动和非第一次启动的区别:
- 第一次启动 NameNode 时,进行格式化,创建 fsimage 和 edits 文件。
- 如果不是第一次启动,直接加载编辑日志和镜像文件到内存。
- 启动 DataNode:
- 向 NameNode 注册
- 发送 block report
- 检查 fsimage 中记录的块的数量和 block report 中的块的总数是否相同
- 客户端请求和元数据操作:
- 客户端对元数据进行增删改的请求。
- NameNode 的操作记录和内存操作:
- NameNode 记录操作日志,更新滚动日志。
- 在内存中对数据进行增删改查。
第二阶段:Secondary NameNode 工作
- 询问是否需要 checkpoint:
- Secondary NameNode 询问 NameNode 是否需要进行 checkpoint。
- NameNode 返回是否需要进行 checkpoint 的结果。
- 请求执行 checkpoint:
- Secondary NameNode 请求执行 checkpoint。
- NameNode 滚动 edits 日志:
- NameNode 滚动正在写的 edits 日志。
- 拷贝编辑日志和镜像文件到 Secondary NameNode:
- 将滚动前的编辑日志和镜像文件拷贝到 Secondary NameNode。
- 加载和合并操作:
- Secondary NameNode 加载编辑日志和镜像文件到内存,并进行合并操作。
- 生成新的镜像文件:
- 生成新的镜像文件 fsimage. Chkpoint。
- 拷贝镜像文件到 NameNode:
- 将生成的 fsimage. Chkpoint 拷贝到 NameNode。
- 重新命名镜像文件:
- NameNode 将 fsimage. Chkpoint 重新命名为 fsimage。
Hadoop 中的高可用性机制(HA)
Hadoop 中的高可用性(HA)机制主要通过以下组成部分来实现:
- Active NameNode 和 Standby NameNode:
两台 NameNode 形成互备,一台处于 Active 状态(主 NameNode),为主要提供读写服务;另一台处于 Standby 状态(备 NameNode),用于备份。只有主 NameNode 才能对外提供读写服务。 - ZKFailoverController(主备切换控制器,FC):
ZKFailoverController 作为独立的进程运行,负责对 NameNode 的主备切换进行总体控制。它能及时检测到 NameNode 的健康状况,借助 Zookeeper 实现自动的主备选举和切换。在主 NameNode 故障时,ZKFailoverController 通过 Zookeeper 实现快速而可靠的主备切换。 - Zookeeper 集群:
为主备切换控制器提供主备选举支持。Zookeeper 维护了一个小而稳定的集群,确保在主 NameNode 故障时能够进行高效的主备切换。 - 共享存储系统:
共享存储系统保存了 NameNode 在运行过程中产生的 HDFS 元数据。每次写文件时,需要将日志同步写入共享存储,这个步骤成功才能认定写文件成功。然后备份节点定期从共享存储同步日志,以便进行主备切换。在进行主备切换时,新的主 NameNode 在确认元数据完全同步后才能对外提供服务。 - DataNode 节点:
DataNode 向主 NameNode 和备 NameNode 上报数据块的位置信息,以实现主备 NameNode 对 HDFS 数据块和 DataNode 之间的映射关系的共享。这样,故障切换可以更快速地进行。
补充信息:
- 在 HA 配置中,当发生主备切换时,新的 Active NameNode 首先会加载共享存储中的最后一次检查点(Checkpoint)和 EditLogs,进行元数据的恢复和重放,完成后才开始服务。
- ZKFailoverController 通过心跳机制检测 NameNode 的存活状态。如果 Active NameNode 不再发送心跳,ZKFC 会尝试立刻启动选举流程。
- 在 HDFS HA 配置中,还需要配置额外的 HDFS 客户端重试策略,以适应在故障转移期间可能出现的服务暂时不可用的情况。
- 若选择使用 QJM 作为共享存储系统,需要配置多个 JournalNode,并通常分布在不同的服务器上来提高容错能力,这种多 JournalNode 集群的架构可以确保 EditLog 在多个地方同时写入,以防单点故障。
ZKFC 在 HDFS HA 中的作用
ZKFailoverController(ZKFC)是 Hadoop HDFS 高可用性(HA)配置中的一个关键组件,其主要职责和工作机制如下:
- 健康监测:ZKFC 负责对它所监控的 NameNode(NN)执行周期性的健康检查。这通常涉及发送心跳信号和健康探测命令。若 NN 因为硬件故障、进程崩溃或其他问题无法响应心跳,ZKFC 则将其标记为不健康状态。
- 会话管理:ZKFC 在 Zookeeper 中保持一个活跃的会话,并根据 NameNode 的状态,在 Zookeeper 中创建或维护相应的状态节点(znode)。对于处于 Active 状态的 NN,ZKFC 会创建一个短暂的 znode。这个节点的存在在 Zookeeper 中代表着锁的拥有权,即 Active NN 的位置。如果 Active NN 宕机,该节点会随着会话丢失而被自动删除,这允许其他 NN 捕捉到此事件,并尝试成为新的 Active NN。
- 主从选举:出现 NN 宕机时,ZKFC 会借助 Zookeeper 的选举机制来确定新的 Active NN。各个 Standby NN 上的 ZKFC 对锁进行争抢(通过尝试创建同名短暂 znode),且按照 Zookeeper 的顺序保证,最终有且只有一个 ZKFC 能成功建立节点,对应的 NN 升级为新的 Active 状态。
- 处理故障复原:当先前故障的 NN 重新启动后,它会尝试与 Zookeeper 重新建立会话。如果它发现已有其他 NN 占据了短暂 znode,它将自动转入 Standby 状态,并且准备在需要时再次作为备选积极参与选举。
补充信息:
- 虽然 ZKFC 通常与两个 NameNode(一个 Active,一个 Standby)搭配使用,以实现高可用性的配置,但理论上确实支持多于两个的 NN,以应对更复杂的配置要求。然而,请注意在实际部署中,通常只配置两个 NameNode,因为这样便于管理,并且可以满足大 majority 的场景需求。
- ZKFC 是 HA 架构的自动恢复机制的核心部分,能够确保 NameNode 故障发生时的快速自动切换,减少系统停机时间,同时还防止了可能导致数据不一致的“脑裂”现象。
- 应当在每个 NameNode 上运行一个 ZKFC 实例。当启动一个 ZKFC 时,它首先会尝试立即执行健康检查并成为 Active NameNode;如果当前已有 Active NN,则它将转为 Standby。
- 在整个 HA 与故障转移机制中,ZKFC 与 HDFS 客户端、DataNode 和其他 Hadoop 生态系统组件紧密协作,确保数据访问的高度可靠性和可用性。
HDFS 块大小的选择原理
1. 寻址时间与磁盘传输时间的权衡: 在选择块大小时,需要权衡寻址时间和磁盘传输时间。文件块越大,寻址时间越短,但磁盘传输时间越长;文件块越小,寻址时间越长,但磁盘传输时间越短。这是因为块越大,同样数量的数据需要更少的元数据,但在传输时需要更长的时间。
2. 块大小不能设置过大或过小的原因:
- 块设置过大: 导致磁盘传输时间显著大于寻址时间,影响程序处理效率,尤其在 MapReduce 中,map任务一次处理一个块,块过大会导致运行速度减慢。
- 块设置过小: 会占用大量的NameNode内存来存储元数据,而NameNode的内存是有限的,不适宜。同时,小块会增加寻址时间,导致程序在找块的开始位置时花费较长时间。
3. 合适的块大小选择:
- 寻址时间与传输速率的权衡: 为了使寻址时间仅占传输时间的一小部分,块大小的选择应该考虑寻址时间和磁盘传输时间的相对关系。一般推荐块大小设置为传输时间的数倍。
- 默认块大小 128 MB: HDFS的默认块大小为128 MB。这是通过对寻址时间、传输速率等因素的综合考虑得出的结果。在典型的场景中,128 MB的块大小既可以提高数据传输效率,又能够在一定程度上缓解小文件问题。
4. 块大小的影响与权衡:
- 减少元数据开销: 大块大小减少了文件块的数量,从而降低了NameNode的元数据开销,提高系统性能和扩展性。
- 提高数据传输效率: 大块大小减少了寻址和数据传输的开销,提高了数据读取和写入的效率。
- 缓解小文件问题: 大块大小可以缓解存储大量小文件所带来的问题,提高存储和管理的效率。
5. 影响因素的综合考虑:
- 存储资源利用率: 增大块大小可以提高存储资源利用率,但可能会导致存储碎片化和空间浪费。
- 内存需求: 大块大小可能增加NameNode的内存压力,特别是在大规模存储系统中。
- 并行度: 小块大小可以增加并行度,适用于小规模和低负载系统,而大块大小可能降低并行度。
结论: 在块大小的选择上,需要根据实际需求和系统场景进行综合权衡,适当调整以获得最佳性能和资源利用率。在典型情况下,默认的128 MB块大小已经在性能和效率上做了一定的平衡。
HDFS 常用的数据压缩算法
Hadoop 分布式文件系统(HDFS)支持多种数据压缩算法,常用的包括 bzip2、gzip、lzo 和 snappy。这些算法可以在不同的场景中使用,根据需求选择合适的压缩算法。
常用的压缩算法:
bzip2:
- 优点: 提供较高的压缩比,适用于存储对存储空间要求较高的场景。
- 缺点: 压缩和解压速度相对较慢,不太适用于需要快速处理的场景。
gzip: - 优点: 提供较为平衡的压缩比和压缩速度,是一种常见的通用压缩算法。
- 缺点: 压缩比相对于bzip2较低。
lzo: - 优点: 具有较高的压缩和解压速度,适用于需要高性能的场景。
- 缺点: 压缩比相对较低。
snappy: - 优点: 提供较高的压缩和解压速度,适用于需要快速处理大量数据的场景。
- 缺点: 压缩比相对较低,但在对速度要求较高的场景中广泛使用。
压缩位置:
在HDFS中,数据可以在不同的位置进行压缩,主要包括以下几个方面:
MapReduce 程序中的输出压缩: 在 MapReduce 任务的输出阶段对数据进行压缩,减少数据在磁盘上的存储空间,加速数据的传输和处理过程。
HDFS 存储层面的压缩: HDFS支持在存储层面对数据进行压缩,可以在文件写入HDFS时选择使用不同的压缩算法。
HBase 表压缩: HBase 是建立在 HDFS 之上的分布式数据库,支持对表进行压缩,可以根据实际需求选择适当的压缩算法。
应用场景选择:
- bzip2和gzip: 适用于对存储空间要求较高,而且对处理速度要求相对较低的场景。
- lzo 和 snappy: 适用于需要高性能处理大量数据的场景,其中 snappy 在企业开发中应用较为广泛,特别是对于快速数据处理和分析的需求。
Yarn
Yarn 介绍
YARN(Yet Another Resource Negotiator,另一种资源协调器)是 Apache Hadoop 生态系统中的一个重要组件,用于管理和调度集群资源。其设计目标是提供一个通用的集群管理平台,支持多种大数据处理框架,如 MapReduce、Apache Spark 等。
关键概念:
- ResourceManager(资源管理器): YARN 集群中的主管角色,负责整个集群的资源管理和调度。ResourceManager维护着集群中所有资源的信息,根据应用程序的需求进行资源分配和调度。
- NodeManager(节点管理器): 在每个集群节点上运行,负责管理该节点上的资源。NodeManager 接收 ResourceManager 分配的任务,并启动、监控任务的执行,报告执行状态给 ResourceManager。
- ApplicationMaster(应用程序主管): 是每个运行在集群上的应用程序的主管,负责向 ResourceManager 申请资源、跟踪任务的执行状态、处理任务的失败和容错等。每个应用程序有独立的 ApplicationMaster。
YARN 的工作流程:
- 用户提交应用程序到 ResourceManager。
- ResourceManager 启动一个 ApplicationMaster 来管理该应用程序。
- ApplicationMaster 向 ResourceManager 申请资源,并启动任务执行。
- NodeManager 接收 ResourceManager 分配的任务,启动并监控任务执行。
- ApplicationMaster 与 NodeManager 之间协作,进行任务的执行、监控和报告状态。
- ResourceManager 负责整体资源的调度和集群资源的管理。
优势:
- 通用性: YARN 的设计使其成为通用的资源调度平台,不仅仅局限于支持 MapReduce,还能够容纳其他计算框架,使集群更灵活多样。
- 资源的弹性调度: YARN 允许多个应用程序共享集群资源,实现资源的弹性分配和释放,提高了整个集群的利用率。
- 多框架支持: 不再局限于特定的计算模型,支持多种分布式计算框架,例如 Apache Spark、Apache Flink 等,使得用户能够更灵活地选择合适的计算框架。
- 分布式应用的支持: 通过解耦资源管理和作业调度,YARN 可以更好地支持多个应用程序的并发运行,包括非批处理型应用。
Yarn 的来源
产生原因:
Hadoop 1.x 中的资源管理和作业调度是由单一的 JobTracker 负责的。这种架构在规模庞大的集群上面临一些问题,例如单点故障、扩展性差等。为了解决这些问题,YARN(Yet Another Resource Negotiator)应运而生。
问题解决:
- 单点故障: 在 Hadoop 1.x 中,JobTracker 是单点,一旦出现故障,整个集群的作业调度和资源管理就会受到影响。YARN 引入了 ResourceManager,将资源管理与作业调度分离,避免了单点故障。
- 有限的作业模型: Hadoop 1.x 中主要支持 MapReduce 作业模型,而 YARN 的设计目标是支持更多类型的计算模型,使得不仅仅局限于 MapReduce,还可以运行其他类型的分布式应用程序,如 Spark、Flink、Storm 等。
- 资源利用不高: Hadoop 1.x 中,资源分配是以作业为单位的,一个作业如果分配的资源没有充分利用,则造成资源浪费。YARN 引入了容器的概念,以更细粒度地管理资源,提高了资源的利用率。
优势:
- 资源的弹性调度: YARN 允许多个应用程序共享集群资源,根据需要动态地分配和释放资源。这种弹性的调度机制使得集群的利用率更高。
- 多框架支持: YARN 的设计使得它不再局限于特定的计算模型,而是能够支持多种分布式计算框架。这为用户提供了更多选择,可以根据不同的应用场景选择合适的计算框架。
- 提高集群的整体效率: 通过引入 ResourceManager 和 NodeManager 的架构,YARN 实现了更好的资源管理,提高了集群的整体效率。ResourceManager负责全局资源的分配,NodeManager负责本地资源的管理。
- 支持非批处理应用: 与 Hadoop 1.x 只支持批处理模型不同,YARN 的出现使得支持实时计算、流处理等非批处理应用成为可能。
总的来说,YARN 的出现使得 Hadoop 集群更加灵活、高效,能够更好地适应不同类型的大数据应用。
ResourceManager 介绍
ResourceManager 是 YARN(Yet Another Resource Negotiator)的一个关键组件,是一个全局的资源管理器,主要包括两个组件:调度器(Scheduler)和应用程序管理器(Applications Manager,ASM)。
调度器(Scheduler)
调度器的主要责任是根据系统中各个应用程序的资源需求,将集群中的资源进行合理的分配。调度器并不涉及具体应用程序的运行细节,它只关心资源的分配。以下是调度器的主要特性和职责:
- 资源分配: 根据容量、队列等限制条件,将系统中的资源分配给各个正在运行的应用程序。
- 资源容器: 调度器使用资源容器(Container)这个抽象概念,将内存、CPU、磁盘、网络等资源封装在一起,作为动态资源分配的单位。每个任务或执行单元都运行在一个资源容器中,限定了它使用的资源量。
- 可插拔性: 调度器是可插拔的组件,用户可以根据需求设计新的调度器。YARN 提供多种可用的调度器,例如 Fair Scheduler 和 Capacity Scheduler。
应用程序管理器(Applications Manager,ASM)
应用程序管理器负责整个系统中所有应用程序的管理,包括应用程序的提交、与调度器协商资源、ApplicationMaster(应用程序主管)的运行状态监控以及在失败时重新启动 ApplicationMaster。以下是应用程序管理器的关键职责:
- 应用程序提交: 负责接收和处理用户提交的应用程序,启动相应的 ApplicationMaster。
- 资源协商: 与调度器协商应用程序所需的资源,包括申请资源、释放资源等。
- 运行状态监控: 监控 ApplicationMaster 的运行状态,负责重新启动 ApplicationMaster,以保证应用程序的高可用性和容错性。
ResourceManager 的引入使得集群资源得到更加灵活和高效的管理,支持不同类型的应用程序在同一集群上并发运行。同时,YARN 的设计也为未来更多计算框架的接入提供了扩展性。
NodeManager 介绍
NodeManager 是 YARN(Yet Another Resource Negotiator)中每个节点上的关键组件,负责运行在节点上的资源和任务的管理。其主要职责包括两个方面:
资源报告
NodeManager 定期向 ResourceManager 汇报本节点上的资源使用情况以及各个运行中的 Container 的状态。这些资源报告包括节点的总体资源情况,例如可用内存、CPU 等,以及各个正在运行的任务(Container)的详细信息。ResourceManager 利用这些报告来进行整个集群资源的监控和调度。
Container 管理
NodeManager 接收来自 ApplicationMaster 的请求,主要涉及 Container 的启动和停止。具体而言,NodeManager 根据 ResourceManager 分配的资源,在节点上启动和管理 Container,以运行特定的任务。同时,NodeManager 也负责停止已经完成的或者不再需要的 Container,释放相应的资源。这种动态的 Container 管理使得 YARN 能够更灵活地适应各种应用程序的运行需求。
总体而言,NodeManager 在集群节点上充当 ResourceManager 的代理,负责具体资源的管理和任务的执行。与 ResourceManager 和 ApplicationMaster 一起,NodeManager 构成了 YARN 的核心组件,支持分布式的、多租户的资源调度和管理。
ApplicationMaster 介绍
ApplicationMaster 是 YARN(Yet Another Resource Negotiator)中每个应用程序的主管,负责管理特定应用程序的资源申请、任务调度、监控和容错等任务。每个被提交到 YARN 集群的应用程序都会包含一个独立的 ApplicationMaster。
主要功能:
- 资源协商:ApplicationMaster 与 ResourceManager 协商获取运行应用程序所需的资源。这个协商的结果是一系列的容器(Containers),它们是运行任务的基本单位。
- 任务调度:ApplicationMaster 将 ResourceManager 分配的资源进一步分配给应用程序内部的具体任务。这可能包括 MapReduce 任务、Spark 任务等,具体取决于应用程序的类型。
- 容器管理:ApplicationMaster 与 NodeManager 通信,请求启动和停止任务所需的容器。NodeManager 是每个节点上运行的组件,负责具体的任务执行和资源管理。
- 监控:ApplicationMaster 监控所有任务的运行状态,收集任务的运行信息和日志。在任务失败的情况下,它会向 ResourceManager 请求重新分配资源,并重新启动失败的任务,实现容错机制。
- 通信与协调:ApplicationMaster 与 ResourceManager、NodeManager 之间进行有效的通信和协调。与 ResourceManager 之间的协商用于获取资源,与 NodeManager 之间的通信用于具体任务的启动和停止。
- 应用程序生命周期管理:ApplicationMaster 管理应用程序的整个生命周期,从资源的申请到任务的完成。它在应用程序执行期间负责协调和管理整个过程。
总体而言,ApplicationMaster 是 YARN 中的关键组件,为应用程序提供了一个独立、分布式的资源管理和执行环境。不同类型的应用程序(如MapReduce、Spark等)都需要编写自己的 ApplicationMaster,以适应其特定的执行需求。
Container 介绍
在 YARN(Yet Another Resource Negotiator)中,Container 是对资源的一种抽象表示,用于封装一个节点上的多维度资源,包括但不限于内存、CPU、磁盘、网络等。当一个应用程序通过其 ApplicationMaster 向 ResourceManager(RM)申请资源时,RM 向应用程序分配的资源就是以 Container 的形式返回的。
主要特点和作用:
- 资源封装:Container 封装了节点上的各类资源,为应用程序提供了一个独立的资源隔离环境。
- 任务执行单位:每个任务(或者说每个进程、容器内的程序实例等)在 YARN 上都会被分配一个 Container,而该任务只能使用该 Container 中描述的资源。这样可以确保任务间的资源隔离。
- 多维度资源描述:Container 不仅包含内存这一维度的资源,还包含了其他可能影响任务执行的资源信息,如 CPU、磁盘、网络等。
- 动态资源分配:ResourceManager 根据应用程序的需求和集群的资源状况,动态分配 Container。这使得 YARN 能够有效地利用集群资源,提高资源利用率。
- 生命周期管理:每个 Container 都有自己的生命周期,包括创建、分配资源、运行任务、释放资源等。这由NodeManager负责。
通过 Container 的使用,YARN 实现了对集群资源的有效管理和分配,为不同类型的应用程序提供了统一的资源调度框架。Container 的抽象使得应用程序可以在分布式环境中运行,同时充分利用集群资源。
Yarn 调度 MapReduce 任务的过程
- MapReduce 程序提交: 客户端将 MapReduce 程序提交到集群中的某个节点。
- Yarnrunner向ResourceManager申请Application: Yarnrunner是一个YARN客户端,向ResourceManager请求一个新的Application。
- ResourceManager返回资源路径: ResourceManager将为该Application分配的资源路径返回给Yarnrunner。
- 程序资源提交到HDFS: MapReduce程序将运行所需的资源提交到HDFS资源路径上,供后续任务使用。
- 申请运行ApplicationMaster: 提交完资源后,程序申请运行ApplicationMaster。
- ResourceManager初始化任务: ResourceManager将用户的请求初始化为一个Task,该Task包含运行Map和Reduce所需的信息。
- NodeManager领取Task任务: 有一个NodeManager领取到Task任务。
- 创建容器Container: 领取Task的NodeManager创建容器Container,并生成ApplicationMaster。
- Container拷贝资源到本地: Container从HDFS上拷贝所需资源到本地。
- ApplicationMaster申请运行MapTask容器: ApplicationMaster向ResourceManager申请运行MapTask容器。
- ResourceManager分配MapTask任务: ResourceManager将运行MapTask任务分配给另外两个NodeManager,它们各自领取任务并创建容器。
- 发送程序启动脚本: ResourceManager向两个接收到任务的NodeManager发送程序启动脚本,这两个NodeManager启动MapTask,对数据分区排序。
- 申请运行ReduceTask容器: ApplicationMaster向ResourceManager申请运行ReduceTask容器。
- ReduceTask获取分区数据: ReduceTask向MapTask获取相应分区的数据。
- 程序运行完毕: 程序运行完毕后,ApplicationMaster向ResourceManager注销自己。
补充信息:
- 在 YARN 中,ResourceManager 主要负责资源管理和作业调度,而 ApplicationMaster 负责单个作业的生命周期管理和资源协调。
- NodeManager 是运行在每个节点上的 YARN 组件,管理单个节点的资源并监控容器的执行。
- 容器 Container 是 YARN 中的资源抽象,代表了一定量的集群资源,如 CPU 内存等。
- YARN 的设计将资源管理与作业调度分离开,提高了集群资源利用率,同时支持了多种计算框架并且易于扩展。
- 整个过程是由 YARN 的调度器和复杂的内部状态机管理的,确保资源的高效使用和作业的高效执行。
Yarn 中常用的调度器
FIFO(First In, First Out):
- 特点: 是 Hadoop 中的默认调度器。按照作业的优先级和到达时间先后选择被执行的作业。
- 工作原理: 根据作业的优先级高低和提交时间先后进行调度,较早提交或者优先级较高的作业先被执行。
- 适用场景: 适用于简单场景,不需要复杂的资源管理和分配策略。
计算能力调度器 Capacity Scheduler: - 资源划分: 容量调度将整个集群的资源划分为多个队列,每个队列被分配一个预定义的容量百分比。这样,每个队列可以获得一定比例的集群资源。
- 资源使用: 在容量调度中,每个队列可以使用其分配的容量,并在队列空闲时使用集群的剩余资源。
- 队列优先级: 队列之间可以定义优先级,高优先级的队列将获得更多的资源。
- 预留容量: 可以为每个队列配置最小和最大资源,以确保队列至少获得其最小容量,同时不超过其最大容量。
公平调度器 Fair Scheduler: - 资源分配: 公平调度试图在集群的所有作业之间实现公平分配资源。不同于容量调度,它不将资源划分为固定的队列。
- 动态分配: 资源是动态分配的,而不是预先分配给特定队列。空闲资源将被分配给最需要资源的作业,以实现公平性。
- 作业池: 公平调度使用作业池,作业池中的作业根据当前资源需求来动态获取资源。
- 权重和公平性: 可以为不同的作业分配权重,以影响它们获取资源的优先级。公平调度的目标是使得所有作业在长期内都能获得相似的资源分配,实现公平性。
区别总结:
- 资源分配方式: 容量调度预先将资源分配给队列,而公平调度动态分配资源给作业。
- 队列与作业: 容量调度中有明确的队列概念,而公平调度则更注重整个作业池中的作业。
- 调度方式: 容量调度以队列为单位,每个队列有自己的容量,而公平调度更注重整体公平性,动态地将资源分配给最需要的作业。
选择容量调度还是公平调度通常取决于集群的需求和使用场景,以及对资源分配策略的不同偏好。Apache 默认的资源调度器是容量调度器;CDH默认的资源调度器是公平调度器。
MapReduce
MapReduce 作业的执行流程
在 Hadoop MapReduce 框架中,一项作业(job)通常被分为两大阶段:Map 阶段和 Reduce 阶段。每个阶段均包含一系列步骤:
Map 阶段:
- 输入分片(Input Splits):
- 作业开始前,MapReduce 框架会根据输入文件的大小以及配置的分片大小进行计算,将输入数据切割成多个分片(input splits)。
- 每个输入分片对应一个 Map 任务,由一个 MapTask 处理。
- Map 函数:
- 开发者编写的 Map 函数被执行,通常是在数据存储的节点上运行以实现数据的本地化处理。
- 分区(Partition):
- 分区函数决定了 Map 的输出键(key)应该被发送到哪个 Reduce 任务处理,数量一般等于 Reduce 任务的数量。
- 默认使用的是 HashPartitioner,它基于 key 的哈希值执行分区。
- 溢写(Spill):
- Map 任务在处理过程中,数据先被写入环形内存缓冲区,并在达到阈值时溢出到磁盘,形成溢出文件。
- 这个阶段分为两个子步骤:
- 排序(Sort):写入环形缓冲区前对数据进行排序,格式为<key, value, partition>。
- 合并(Combine,可选):在数据被溢出到磁盘前,可以选择进行一次局部的 combine 操作,这相当于提前执行了一次局部的 reduce 操作,合并具有相同 key 的数据。
- 合并(Merge):
- Map 任务生成的多个 spill 文件在最终被传递到 Reduce 任务之前需要被合并。
- 合并过程创建了一个大的排序文件及其索引文件。
Reduce 阶段:
- 拷贝(Copy):
- Reduce 任务通过 HTTP 从所有 MapTask 节点拷贝它们的输出结果。
- 拷贝操作先将数据放到内部缓冲区,然后触发溢写到磁盘的操作。
- 合并(Merge):
- Reduce 端将拷贝来的多个 Map 输出文件合并成一个大文件,这个过程可能包含多次磁盘到磁盘的合并。
- Shuffle:
- 每个 Reduce 任务通过 shuffle 过程对来自所有 Map 输出的数据进行重新排序和组合,产生一个大文件供后续执行 Reduce 函数。
- Reduce 函数:
- 最后,执行用户自定义的 Reduce 函数,将处理结果写入 HDFS。
补充信息:
- Spill 过程:Spill 是 MapTask 将内存中的数据写入磁盘时的一个阶段,通常涉及排序(按 key 的字典序)和可选的 combine(局部 Reduce)操作,以减少数据量。
- Combine 函数:Combine 函数是一个可选的阶段,在 Map 端用于局部聚合以减少数据传输量,类似于一个小规模的 Reduce 操作。
- Shuffle 阶段:虽然未在上述说明中明确标出,Shuffle 是 Reduce 阶段中的一个隐含步骤,负责将 MapTask 输出的数据合并在一起,并按照 key 排序,以供 Reduce 函数处理。
- MapReduce 框架的设计旨在有效处理大规模数据集,其设计哲学侧重于高效的数据处理和传输。
- Shuffle 是 MapReduce 中特有的过程,负责数据从 Map 阶段到 Reduce 阶段的转移。
- 为了更好地控制网络和存储资源,MapReduce 允许调整缓冲区大小、溢写阈值和并发拷贝操作的数量等多个参数。
- 处理流程是高度优化的,尽可能将计算移动到数据所在的地方,减少网络传输。
- “Combine” 是一个对性能影响很大的可选阶段,在 Map 端对输出结果进行局部预聚合,从而减少向 Reduce 传输的数据量。
- MapReduce 也有不足之处,如 Shuffle 过程可能成为瓶颈,以及对小型作业而言引导时间长。因此,对于某些用例,其他大数据处理框架(如 Apache Spark)可能更适合。
注意:MapTask 的 Combine 阶段和 ReduceTask 中 Sort 后 Shuffle 到 Reduce 函数的过程是具有显著区别的。Combine 是可选的,发生在 MapTask 写磁盘之前,以减少数据量;而 Shuffle 是必须的,保证 ReduceTask 喂给 Reduce 函数的数据是排序并且按照 key 聚合好的。
Mapper 和 Reducer 数量的决定因素
在 Hadoop MapReduce 框架中,Mapper 和 Reducer 的数量对于作业的运行效率和资源利用率有很大影响,这些数值可以根据以下方面来确定:
Mapper 的个数:
- 主要由输入数据的分片(Input Split)决定,每个分片对应一个 Mapper。
- 分片的大小可以通过MapReduce配置参数来控制,例如:
mapreduce.input.fileinputformat.split.minsize
:设置最小的分片大小。mapreduce.input.fileinputformat.split.maxsize
:设置最大的分片大小。
- 通过调整分片大小,可以影响 Mapper 的个数。
Reducer 的个数:
- Reducer 的个数在作业配置中可以直接设置,通过调用
job.setNumReduceTasks(int tasks)
方法,其中tasks
为 Reducer 的数量。 - 默认情况下,如果未进行设置,Reducer 的个数为 1。
- 确定合适的 Reducer 数量通常考虑以下因素:
- 数据规模和输出片段数:如果 Map 阶段的输出数据量较小,则可能不需要太多的 Reducer。
- 计算复杂性:对于计算密集型任务,可能需要更多的 Reducer 来并行处理。
- 集群规模和资源:集群的规模和当前任务节点的可用资源是决定 Reducer 数量的重要依据。
- 数据倾斜问题:避免因某些 Reducer 遇到数据倾斜导致的性能瓶颈,可以通过自定义分区策略或增加 Reducer 的数量来缓解。
补充信息:
- 在配置 Mapper 和 Reducer 个数时,还需要注意不要过多地设置 Mapper 或 Reducer,以免造成过大的初始化开销和调度压力。
- Mapper 和 Reducer 之间存在一个“Shuffle and Sort”的过程,这个过程也会影响性能。Reducer 的个数不宜过少,以避免成为处理瓶颈,但是也不宜过多,以免增加网络传输和排序的负担。
- 在分布式计算中,通常不推荐一个 Reducer 处理过多的数据,这样会使得作业的完成时间受到该 Reducer 的处理速度的影响。
- 考虑作业的具体需求和集群的能力,合理配置 Mapper 和 Reducer 的数量非常关键。在某些情况下,进行实验和调优也是必要的。
MapReduce 中的常见排序
排序的种类:
部分排序(Partial Sorting):
- MapReduce 自动在每个 MapTask 输出结果上执行部分排序,确保产出的每个文件内部有序。这便于后续在 Reduce 阶段的合并操作。
全排序(Total Sorting): - 要在整个数据集上进行全局的排序,可以使用单个 Reducer 来确保所有数据通过单一点进行排序,但这通常效率低且不适合大数据集。
- 另一个效率更高的方法是分区排序:划分多个区间(例如,根据字母或数值范围),在 MapReduce 任务中配置多个 Reducer,每个 Reducer 负责其区间内的局部排序,最终合并这些局部有序文件以生成全局排序文件。
辅助排序(Secondary Sorting): - 在 MapReduce 中,可以通过自定义 GroupingComparator(分组比较器)来控制哪些键值对会一起传递给每个调用 reduce 方法的迭代。这种方式允许在同一个 reduce 调用中对键值对进行辅助排序。
二次排序(Secondary Sorting): - 通过在 Bean 中实现 WritableComparable 接口并重写 compareTo 方法,可以实现键(key)上的排序,同时可以考虑到对值(value)的排序,实现所谓的二次排序。
排序发生的阶段:
MapSide:
- Spill 阶段:MapTask 清空其内存缓冲区时会发生排序,即 spill 阶段,这是 Map 端第一次排序,对输出结果按 key(键)进行排序。
- Partitioner 阶段:排序后,Partitioner 判断数据去往哪个 Reducer,此时依据的是 key(键)的部分排序结果。
ReduceSide: - Copy 阶段:ReduceTask 从各个 MapTask 拷贝数据,这些数据已经在 Map 端排序。
- Reduce 阶段:在数据最终到达 reduce 方法之前会进行排序(如果有必要),reduce 方法输入的数据是按照 key(键)进行分组的数据集。
补充信息:
- 在 MapReduce 中,排序是框架自动处理的一个组成部分,在 Map 端和 Reduce 端都会进行。
- 当需要更复杂的排序时,开发者可以通过实现自定义的 Comparator 或 WritableComparable 接口来控制数据的排序方式。
- MapReduce 的设计理念是通过 shuffle 阶段来传输、合并和排序数据,为 reduce 函数提供按 key(键)排序的数据集。组合这些阶段可以实现复杂的排序需求。
序列化和反序列化
- 序列化:是指将程序中的对象转换为适合存储或传输的数据格式的过程,通常用于网络通信和数据持久化。在 Hadoop 中,序列化保证了数据的快速、紧凑的传递格式。
- 反序列化:则是将存储或传输的数据恢复为原始对象的过程。
- Java 序列化:是 Java 原生支持的序列化框架,通过实现
Serializable
接口来使用,但它包含了很多额外的信息。 - Hadoop 序列化:Hadoop 推出了自己的序列化接口
Writable
,用以替代 Java 的序列化框架,以适应网络传输的高效要求。
自定义 Bean 对象序列化
- 实现 Writable 接口:自定义的 Bean 类必须实现 Hadoop 的
Writable
接口,以支持序列化框架。 - 空参构造函数:必须提供一个空参数的构造函数,因为在反序列化过程中,Hadoop 通过反射会调用这个构造函数来实例化对象。
- **写入序列化方法
write
**:重写write(DataOutput out)
方法,定义对象序列化的逻辑。 - **实现反序列化方法
readFields
**:重写readFields(DataInput in)
方法,定义对象的反序列化逻辑,确保反序列化顺序与序列化顺序完全一致。 - 重写
toString()
方法:为了将 Bean 对象的数据结果展示在文本文件中便于读取,通常需要重写toString()
方法并以制表符\t
为分隔符格式化输出。 - 实现 Comparable 接口(如果需要):如果 Bean 对象需要作为 MapReduce 框架中的 Key 进行传递,则还需要实现
Comparable
接口以支持数据在 Shuffle 过程中的排序。
补充信息:
- 在 MapReduce 编程中,所有需要在网络间传输或者需要写到磁盘上的对象都应该实现
Writable
接口。 - 与 Java 的
Serializable
接口不同的是,Writable
接口会更关注于性能优化,省略 Java 序列化中的些许元数据,从而实现更高效的序列化和反序列化。 - MapReduce 中的 Key 类必须实现
WritableComparable
接口,因为 MapReduce 框架依赖于 Key 比较排序以实现任务的分发和输出的排序。
MapReduce 2.0 的容错机制
MapReduce 2.0 (YARN) 的容错机制涉及了两个主要的组件:MRAppMaster 和 Task(Map Task/Reduce Task)。
- MRAppMaster 容错性:
- MRAppMaster 负责管理整个 MapReduce 作业的执行。如果 MRAppMaster 失败,YARN 的 ResourceManager 会负责重新启动它。这包括重新初始化作业资源和调度作业中未完成的任务。
- 用户可以配置 MRAppMaster 的最大重启次数。如果在达到最大尝试次数之前 MRAppMaster 成功恢复,则作业继续执行;否则,整个作业会被标记为失败。
- 默认重启次数:MRAppMaster 默认的最大重启次数是 2次。
- Map Task/Reduce Task 容错性:
- 实际的数据处理是通过 Map Task 和 Reduce Task 来完成的。这些任务在运行过程中会周期性地向 MRAppMaster 发送心跳信号。
- 如果任务失败或挂掉,MRAppMaster 将重新为此任务申请资源并重新启动该任务。这一机制保证了,即使在节点故障的情况下,MapReduce 作业的计算能力不会丢失。
- 和 MRAppMaster 类似,Map Task 和 Reduce Task 的重新启动次数也是可以配置的。
- 默认重新运行次数:单个任务默认最多被重新尝试 4次。
补充信息:
- 除了上述重启机制,YARN 还允许配置节点失效后的回滚机制,当任务运行在某节点上多次失败时,会将该节点标记为失效(即“黑名单”),并尝试在其他节点上运行相关任务。
- 容错机制不仅覆盖了硬件故障,也覆盖了因软件错误导致的任务失败,如程序异常退出或临时软件故障。
- 资源的管理和调度不由 MapReduce 自身管理,而是由 YARN 的 ResourceManager 和 NodeManager 来统一管理,ResourceManager 负责资源分配,NodeManager 负责节点的监控和任务管理,提高了集群的稳定性和资源利用率。
注意:虽然容错机制能够在任务或应用挂掉时提供故障恢复,但如果是数据丢失,则需要 HDFS 的数据冗余策略来保证数据的容错性和高可用性。
FileInputFormat 的切片机制
FileInputFormat 切片机制:
FileInputFormat
是 Hadoop MapReduce 框架中负责分割输入文件的抽象基类。- 切片(Split)是指 MapReduce 中将大量的输入数据分割成易于处理的小部分,每个切片由一个 Map 任务处理。
- 切片机制优势在于,它能够根据数据位置信息自动调整 Map 任务的分布,从而最大化数据本地性并减少网络传输量。
- 切片大小默认与 HDFS 块大小相同,但可以由用户通过配置调整,切片的大小直接影响到 MapReduce 作业的并行度及性能。
- 高效的切片机制可以显著加快处理速度,因为数据处理与数据存储位置更贴近(数据本地性原则)。
作业提交流程中的 FileInputFormat 切片:
在 Hadoop 的 MapReduce 作业提交流程中,初始化和提交作业的步骤涉及到 FileInputFormat 来确定输入数据如何被切片处理。
- 建立连接:
- 在调用
Job.waitForCompletion()
方法时,会首先调用Job.submit()
方法来初始化作业提交流程。 Job.submit()
方法会创建一个Cluster
实例,这个过程包括判断执行环境是本地模式还是集群模式。
- 在调用
- 提交 Job:
- 在
Submitter.submitJobInternal
方法中,执行如下关键步骤:- 创建作业的临时存放路径(Staging Directory)。
- 获取新的 JobID 并创建作业的目录。
- 拷贝作业相关文件(Jar包等)到集群上的路径。
- 在
- 计算切片(关键环节):
- 在
WriteSplits
方法中,使用InputFormat.getSplits
方法来计算数据切片。 - 此方法基于文件的大小、块大小以及可选的文件切片策略来创建一个切片规划文件。
- 这个规划文件决定了作业中的 Map 任务数量及每个 Map 任务处理的数据范围。
- 在
- 提交作业的配置信息:
- 将配置信息写入 XML 文件并存放在 Staging Directory 路径下,供集群中的任务进行读取。
- 提交作业执行任务:
- 作业通过
JobSubmitter
提交给 ResourceManager,由它来分配资源进行执行。 - 提交后得到一个作业状态对象
Status
,它代表了当前作业的运行状态信息。
- 作业通过
例如,JobClient 将会将切片信息和作业配置信息连同其他必要的文件上传到 HDFS,当 MapReduce 系统初始化作业时,这些切片信息和配置信息将被读取和解析,确保 Map 任务可以准确地根据切片信息对数据进行处理。
InputSplit 介绍
InputSplit 定义:
- 在 Hadoop MapReduce 框架中,
InputSplit
表示作业的输入数据集的一个分片;简而言之,它是待处理数据的一个单元。 - 每个
InputSplit
由一个独立的 Mapper 进程处理,可以理解为 Map 任务的输入数据块。 InputSplit
不一定与 HDFS 上的物理数据块大小相同,它是在逻辑层面上的划分,而不改变实际数据存储的物理形式。
InputSplit 在数据处理中的作用:
- 数据本地性优化:InputSplit 包含了额外的信息,比如所包含数据的存储位置(节点列表),MapReduce 会尽可能地在拥有数据副本的节点上安排执行对应的 Map 任务。
- 增加并行度:通过分割成多个 InputSplits,可以在不同节点上并行处理数据,从而有效地利用集群资源,提升作业处理速度。
InputSplit 计算过程(部分源码解析):
- 数据存储目录定位:
- 系统首先定位数据文件所在的目录。
- 目录下文件遍历与切片规划:
- 对于目录下的每一个文件,例如
ss.txt
,进行遍历处理。
- 对于目录下的每一个文件,例如
- 单个文件的切片计算:
- 获取文件的大小:
fs.SizeOf(ss.txt);
。 - 计算切片大小:通过
ComputeSplitSize
方法根据配置和块大小来确定合适的切片大小。 - 默认情况下切片大小等于 HDFS 块的大小(如 128MB)。
- 根据计算出的切片大小对文件进行切割:
- 形成的切片例如
ss.txt—0:128M
,ss.txt—128:256M
,ss.txt—256M:300M
. - 如果最后一个切片剩余部分小于等于一个块大小的 1.1 倍,则该部分也会形成一个切片。
- 形成的切片例如
- 获取文件的大小:
- 切片信息记录:
- 将每一个文件的切片信息记录在一个切片规划文件中。
- 这些信息包括每个切片的起始位置、长度以及所在的节点信息。
- 这一核心过程在
InputFormat.getSplits
方法中完成。
- InputSplit 与 HDFS Block 关系:
- 重要的是要区分 HDFS 上物理存储的 Block 与逻辑上的 InputSplit;
- Block 是数据在 HDFS 上的物理存储单位,而切片是 MapReduce 在处理数据时逻辑上的分区。
- 切片规划文件提交至 YARN:
- 提交切片规划文件至 YARN,ResourceManager 的子组件 MRAppMaster 会基于这些切片规划文件来决定启动多少个 MapTasks。
通过使用 InputSplit,Hadoop 能更高效地对大数据集进行处理,减少了不必要的数据传输,并提高了作业执行的整体效率。在 MapReduce 任务配置和优化中,合理设置 InputSplit 的大小是保证高性能的关键因素之一。
Combiner 的目的和作用
Combiner 的作用:
- 局部汇总:Combiner 的核心作用是在 Map 端提前执行数据汇总,从而减少在 Shuffle 阶段数据在网络上的传输量。
- 性能优化:通过减少数据传输,Combiner 可以显著提高 MapReduce 作业的性能,特别是在网络带宽是瓶颈的情况下。
一般使用情景:
- 适合于合并操作:当 MapReduce 任务的 reduce 操作具有结合性质时(如求和、求最大/最小值、平均值等),使用 Combiner 可以提高效率。
- 业务逻辑不变:只有不影响最终结果的合并操作才能使用 Combiner;否则,可能会导致结果出错。
哪些情境不需要 Combiner:
- 非结合性操作:如果合并逻辑不满足结合律,即
combiner(combiner(a, b), c)
与combiner(a, combiner(b, c))
结果不一致,则不适合使用 Combiner。 - 输出类型差异:如果 Map 的输出类型与 Reduce 的输入类型不相同,而 Combiner 又不能产生与 Reduce 输入类型一致的数据,那么也不能使用 Combiner。
- 结果计算依赖全局视图:当结果的计算需要全局数据视图时,如中位数、各种距离计算等,Combiner 可能无法正确应用。
与 Reduce 的区别:
- 运行位置不同:
- Combiner 是在每个 MapTask 结束后,在本地节点上运行。
- Reduce 是在 Shuffle 阶段之后,在集群中分配的 Reduce 任务节点上运行。
- 处理的数据范围不同:
- Combiner 只合并来自同一个 MapTask 的输出,是局部的数据处理。
- Reduce 处理来自所有 MapTask 的全局数据。
- 目的和影响范围不同:
- Combiner 主要目的是优化性能,尤其是减少数据传输量。
- Reduce 则负责执行最终的业务逻辑,产生最终结果。
实施 Combiner 的技巧:
- 设计时要保证 Combiner 操作是可交换且关联的,从而不会改变最终的 Reduce 结果。
- 应确保 Combiner 的输出类型与 Reduce 阶段的输入类型相匹配。
- 使用时需要明确其可能带来的性能提升,并通过实验分析其对具体 MapReduce 作业的作用。
总体来说,Combiner 可以显著提高 MapReduce 作业的效率,尤其是在处理大数据集时,但需要谨慎使用,确保其应用不会影响到最终的业务逻辑。
默认 Partitioner 机制
在 MapReduce 框架中,如果作业开发者没有指定自定义的 Partitioner 类,那么框架会使用默认的 Partitioner 机制来为 Shuffle 阶段的数据划分分区。默认的分区机制非常简单:
- HashCode 运算:首先,系统会对每个键值对的 key 调用
hashCode()
方法,得到该 key 的哈希码(hashcode)。 - 取模运算:然后,将这个哈希码通过模运算
%
与 Reducer 任务数量进行运算。运算的结果,也就是余数,用来作为当前数据的分区号。
例如,如果有 5 个 Reducer 任务,那么数据项的分区号将是 hashCode(key) % 5
的结果,分区号从 0 到 4。
工作流程举例:
假设键值对中有 key 为 A、B、C 的三条数据,hashCode 运算后的结果是:
- hashCode(A) = 4
- hashCode(B) = 2
- hashCode(C) = 7
并且假设有 3 个 Reducer 任务。根据默认的 Partitioner 逻辑,其分区会是:
- Partition for A = 4 % 3 = 1
- Partition for B = 2 % 3 = 2
- Partition for C = 7 % 3 = 1
这样,键 A 和 C 将分到分区 1,键 B 将分到分区 2。
注意事项:
- 哈希冲突和模数运算可能导致数据倾斜问题,即某些 Reducer 任务所处理的数据量可能远大于其他 Reducer。数据倾斜会影响 MapReduce 作业的执行效率。
- 默认 Partitioner 对数据的分布假设是均匀的。如果数据本身分布不均,可能需要使用自定义的 Partitioner 来获取更有效的数据分布,尤其是当数据量很大时。
默认分区器的这种行为保证了相同 key 的所有数据最终会被分配给同一个 Reducer,这对于保持 Reduce 阶段处理逻辑的正确性是至关重要的。
MapReduce 数据倾斜解决方案
- 提前在 map 阶段进行 combine,减少数据传输量
在 Mapper 中加入 Combiner 相当于提前进行 Reduce 操作,即在单个 Mapper 中对相同的 key 进行聚合,减少了 Shuffle 过程中的数据传输量,并减轻了 Reducer 端的计算负担。然而,若导致数据倾斜的 key 大量分布在不同的 Mapper 上,这种方法效果就不是很明显。 - 导致数据倾斜的 key 大量分布在不同的 Mapper 时的解决方法
- 局部聚合加全局聚合
- 首次 Map 阶段,在导致数据倾斜的 key 上附加 1 至 n 的随机前缀。这样即使原本相同的 key 也会被分配到多个 Reducer 进行局部聚合,从而大幅减少了单个 Reducer 处理的数据量。
- 第二次 MapReduce 过程中,去除 key 的随机前缀,进行全局聚合操作。
- 思想:通过两次 MapReduce,第一次将 key 随机散列到不同的 Reducer 进行处理,达到负载均衡的目的。第二次根据去除随机前缀后的 key 进行最终的 Reduce 处理。
注: 这种方法需要进行两次 MapReduce,性能略有下降。
- 增加 Reducer,提高并行度
通过增加 Reducer 的数量可以提高整体作业的并行度:可以使用 JobConf.setNumReduceTasks (int) 来指定 Reducer 的数量。 - 实现自定义分区
根据数据分布情况,可以自定义散列函数,将 key 均匀地分配到不同的 Reducer 上,以实现负载均衡。
- 局部聚合加全局聚合
这些方法为解决数据倾斜问题提供了一些思路和策略,可以根据具体情况选择适合的方法来优化 MapReduce 作业的性能。
使用 MapReduce 实现 TopN 功能
实现 MapReduce 中的 TopN 功能通常涉及以下几个步骤:
- Map 阶段:
- Mapper 读取输入数据,直接将读取的值或者经过初步处理后的键值对发送到 Reduce 阶段。
- 可以考虑在 Map 阶段内部实现一个局部 TopN,减少网络传输和 Reduce 负载。
- Shuffle 阶段的定制:
- 自定义排序:实现自定义的排序比较器(
WritableComparator
)确保数据按照期望的顺序(如降序)进行排序。 - 自定义分组:自定义分组比较器(
GroupingComparator
)来确保具有相同值的 key 能够被视为一个组,在同一个 Reduce 调用中被处理。
- 自定义排序:实现自定义的排序比较器(
- Reduce 阶段:
- 单个 Reducer:仅仅使用一个 Reducer 来保证全局排序并输出 TopN。
- 多个 Reducer:如果数据量巨大,可以使用多个 Reducer,但是每个 Reducer 输出的前 N 个记录,再对这些记录进行一次全局的 TopN。
- 输出前 N 个记录:
- 在 Reduce 函数中,对每个分组处理的结果(已经按排序好的顺序),只输出当前组的前 N 个记录。
- 如果是单个 Reducer 实现,可以在 Reduce 函数中维护一个大小为 N 的优先队列或者其他数据结构,以保持当前见过的最大的 N 条记录。
- 优化性能:
- 如果数据倾斜严重,需要预处理数据或自定义 Partitioner 和 GroupingComparator 以分散工作负载。
MapReduce 不适合处理的情况
MapReduce 作为一个分布式计算框架,特别适用于大规模数据处理,但它并不适合所有计算场景。以下是一些不适合使用 MapReduce 来提速的情况:
- 小规模数据处理:
- 当数据集非常小的时候,MapReduce 的启动和管理开销可能会超过实际的计算时间,导致效率低下。
- 处理大量小文件:
- MapReduce 适合处理少量的大文件。大量的小文件会导致 NameNode 存储的元数据快速增加,增加管理开销,并且任务的启动时间可能会变长。
- 查询操作优于索引机制:
- 当需要高效的随机访问或查询特定记录时,传统的关系型数据库索引机制比 MapReduce 更加适合,因为后者是批量处理系统,不擅长快速查询。
- 实时事务处理:
- MapReduce 不适合需要实时或近实时的事务处理的场景,比如需要快速响应的在线交易处理系统。
- 单个计算节点:
- 如果只有一台机器可用,使用 MapReduce 并不会带来任何性能提升,因为 MapReduce 是为分布式计算设计,能够在多个节点上并行处理数据
MapReduce 运行效率可能的原因
MapReduce 程序运行效率低下可能是由以下因素导致的:
- 计算资源受限:
- CPU 性能:处理器计算能力不足可能成为瓶颈。
- 内存容量:内存不足可能导致频繁的垃圾回收,甚至是数据溢写到磁盘,严重影响性能。
- 磁盘性能:低速磁盘会延迟数据读写操作,尤其是 MapReduce 高依赖磁盘的场景。
- 网络带宽:网络带宽限制可能导致节点间数据传输变慢,特别是在 Shuffle 阶段。
- I/O 优化问题:
- 数据斜偏:部分 Reducer 负载过重,而其他 Reducer 空闲,导致资源利用不均衡。
- Map/Reduce 任务数设置不当:任务数设置不合理可能导致资源利用不充分或过多的任务调度开销。
- Reduce 任务等待时间过长:Map 任务和 Reduce 任务之间处理速度不匹配,导致 Reduce 任务长时间等待数据。
- 处理小文件过多:每个小文件生成一个 Map 任务,导致大量时间花费在任务调度上。
- 处理不可分块的超大文件:一个超大文件可能导致单个 Map 任务处理时间过长,不易并行化,造成负载不均。
- Spill 次数过多:MapReduce 在内存不足以容纳所有中间结果时,会将数据溢写到磁盘,频繁操作导致性能下降。
- Merge 次数过多:在 Map 和 Reduce 阶段都可能发生多次 Merge 操作,增加了额外的 I/O 开销。
为了提升 MapReduce 程序的运行效率,可以考虑以下优化措施:
- 根据应用场景和集群情况合理地配置任务数。
- 对数据进行预处理以减少倾斜,可能的话合并小文件。
- 调整内存配置来减少 Spill 和 Merge 的发生。
- 优化网络配置和磁盘访问策略来提高数据传输和存储效率。
- 监控和调优集群性能,以充分利用计算资源。
总之,MapReduce 的性能优化需要考虑多个层面,包括硬件资源、作业配置、数据特征以及系统架构等方面。
MapReduce 调优方法
MapReduce 任务的优化有多个环节,可以从数据输入、Map 阶段、Reduce 阶段、I/O 传输效率以及数据倾斜问题几个方面进行优化。
- 数据输入:
- 合并小文件:在执行任务前将多个小文件合并成较大的文件,减少因处理大量小文件而生成的众多 Map 任务,减轻调度负担。
- CombinFileInputFormat:使用该输入格式可以将多个小文件打包成一个 split,减少 Map 任务数量。
- Map 阶段:
- 减少 spill 次数:通过调整 io.sort.mb(内存缓冲区大小)和 sort.spill.percent(溢写触发使用内存百分比)减少内存数据溢写到硬盘的次数。
- 减少 merge 次数:通过调整 io.sort.factor(同时合并的文件数量)参数来减少合并次数。
- 减少 I/O:在 Mapper 输出前使用 Combiner 函数做局部聚合,从而减少 map-to-reduce 的数据量。
- Reduce 阶段:
- 合理设置 Map 和 Reduce 数量:避免设置过少造成任务等待,或者设置过多导致资源浪费。
- 优化 start-up lag:调整 slowstart.completedmaps 参数允许 Reduce 任务在 Map 任务完成一定比例后启动。
- 规避使用 Reduce:在合适的场景下,避免使用 Reduce 阶段可以减少网络传输开销。
- 调整 Reduce 输入 Buffer:通过调整 mapred.job.reduce.input.buffer.percent 参数来直接完成部分 Reduce 作业省去写磁盘的步骤。
- I/O 传输:
- 使用数据压缩:在 Map 输出、Reduce 输出或者中间传输阶段使用压缩减少数据量,节省 I/O 时间。
- 使用 SequenceFile 文件格式:采用二进制存储格式可以提高 I/O 效率并支持更有效的压缩。
- 数据倾斜问题:
- 对于出现的数据倾斜问题,可以通过第一步的收集数据倾斜现象,然后采用多种方法减轻:
- 抽样和范围分区:使用数据抽样设定合理的分区键值范围,确保数据均衡分布到每个 reducer。
- 自定义分区:为特殊键或重载键自定义分区策略,保持负载均衡。
- Combiner:在 Map 端提前进行一次局部聚合,减少数据传输量,降低数据倾斜影响。
- 对于出现的数据倾斜问题,可以通过第一步的收集数据倾斜现象,然后采用多种方法减轻: