Spark 常见面试问题
本文最后更新于 2024年1月10日 凌晨
Spark 常见面试问题
基础
组件介绍
Spark Core:
功能:Spark 的核心引擎,提供基本的分布式任务调度、内存管理和错误恢复功能。包括有向循环图(DAG)、RDD(弹性分布式数据集)、Lingage(操作链)等基础组件。
应用场景:通用的大数据处理任务,提供基本的数据处理能力。
Spark Streaming:
功能:用于实时数据流处理,支持高通量和容错性,可以对多种数据源进行处理,包括 Kafka、Flume、Twitter 等。提供类似 Map、Reduce 和 Join 等操作,将流式计算分解成短小的批处理作业。
应用场景:需要实时处理数据流的应用,如实时监控、实时分析等。
Spark SQL:
功能:允许开发人员使用 SQL 命令进行对关系表和 RDD 进行查询,实现关系型数据和分布式数据的统一处理。支持 Hive 查询语言(HQL),可以与 Hive 整合。
应用场景:需要对结构化数据进行 SQL 查询和分析的场景。
BlinkDB:
功能:用于大规模并行查询,支持交互式 SQL 查询。通过权衡数据精度和查询响应时间,用户可以控制数据的精度范围。
应用场景:在大规模数据上运行交互式 SQL 查询的场景,允许在查询时权衡精度和性能。
MLlib(Spark Machine Learning Library):
功能:是 Spark 生态系统中的机器学习库,提供常见的机器学习算法和工具,使机器学习任务更易于实现。包括分类、回归、聚类、协同过滤等算法。
应用场景:需要进行机器学习任务的应用,如分类、聚类、预测等。
GraphX:
功能:用于图和图并行计算,提供了图处理的 API,支持图的创建、变换和计算。适用于解决图结构的算法问题,如社交网络分析、路径计算等。
应用场景:需要进行图计算的应用,如社交网络分析、推荐系统等。
Spark 与 MapReduce 对比
Apache Spark 和 MapReduce 是两种不同的大数据处理框架,它们虽然都支持大规模的数据集处理,但是在架构和执行模型上有着显著的不同。以下是对 Spark 和 MapReduce 进行对比的整理和总结:
性能:基于内存计算
- Spark: 利用内存进行数据处理,减少了与磁盘的交互。对于迭代算法和交互式数据挖掘任务,内存计算显著提高了处理速度。
- MapReduce: 处理数据过程中,需要把每个 Map 和 Reduce 阶段的中间结果写入磁盘,增加了 IO 开销,并造成延迟。
调度和执行模型:基于 DAG 的调度
- Spark: 通过 DAGScheduler 实现了精细的阶段划分,减少了不必要的计算和中间数据存储。
- MapReduce: 更传统的两阶段执行,为每个任务生成两个阶段(Map 和 Reduce),缺乏细粒度的任务控制与优化。
容错:血缘机制
- Spark: 采用血缘(Lineage)记录来重建丢失的数据分区,快速且高效。
- MapReduce: 每个 Map 和 Reduce 阶段完成后,数据会被写入磁盘,便于恢复,但相对效率较低。
并行计算模型:
- 两者共同特点: 都是分布式处理大规模数据集,允许数据并行处理。
- Spark: 基于弹性分布式数据集(RDD)提供编程模型,支持迭代算法以及复杂的数据流处理。
- MapReduce: 基础的 Map 和 Reduce 编程模型相对简单,适合一次性的数据处理流程。
执行方式和 API 表达能力:
- Spark: 提供了丰富的操作,如 Map、Filter、Join、GroupBy 等,灵活地表达复杂的计算逻辑,支持直接在内存中处理数据,减少了大量磁盘 IO。
- MapReduce: API 和表达能力相对有限,要完成复杂的数据处理流程可能需要串联多个 Job。
数据处理:
- Spark: 将中间数据保存在内存中,减少了读写磁盘的次数,提升了速度。
- MapReduce: 中间数据默认写入磁盘,适合顺序访问,但在迭代或交互式查询中效率低下。
综合来讲,Spark 比 MapReduce 更加灵活高效,尤其当涉及到需要迭代计算的数据分析任务时。Spark 的内存计算模型、DAG 调度和丰富的 API,使得它能够高效处理各种复杂的数据处理任务,并且适应更广泛的大数据处理场景。然而,MapReduce 在处理极大规模的数据处理作业时依然有其稳定可靠的优势,并且是 Hadoop 生态系统的基础部分。
系统架构
主要组件
- **Cluster Manager (Master)**:负责整个集群资源的管理和分配。Standalone 模式下,它是 Master 节点。而在 YARN 模式中,Cluster Manager 由 YARN 的 ResourceManager 充当。
- Worker Node:这些是集群中的从节点,负责维护和提供计算资源(比如 CPU、内存),在 Spark 中也称之为 Slave 节点。它们实际上托管着 Executor 进程。
- Driver:Driver 是负责将应用程序的 main () 函数转换为任务执行作业,并且分发作业到 Executor 上执行。Driver 进程也负责任务的调度、结果的返回给用户以及执行恢复和状态存储等。
- Executor:Executor 是运行在 Worker Node 上的一个进程,为 Spark Application 执行任务。每个 Application 都有一组专门的 Executor 进程。Executor 负责执行任务、以及将数据和结果进行读写到存储系统。
工作流程
- 用户提交应用程序给 Cluster Manager 启动 Driver。
- Driver 会向 Cluster Manager 申请资源用来启动 Executor。
- Cluster Manager 会启动资源调度,为 Driver 分配 Executor。
- Driver 会将计算任务分配到 Executor,Executor 开始执行计算任务。
- Executor 计算完毕后,结果会发送给 Driver,Driver 最终将结果返回给用户。
部署模式
- 本地模式:
- 特点: Spark 应用在本地运行,不依赖 Hadoop 集群,可用于方便调试。
- 子类:
local
:只启动一个 Executor。local[k]
:启动 k 个 Executor。local[*]
:启动与 CPU 数相同的 Executor。
- Standalone 模式:
- 特点: 分布式部署集群,Spark 自带完整的服务,包括资源管理和任务监控。
- 用途: 作为其他模式的基础,可以独立部署。
- Spark on YARN:
- 特点: 分布式部署集群,将资源和任务监控交给 YARN 管理。支持两种运行模式:
- Cluster 模式: 适合生产环境,Driver 运行在集群子节点,具备容错功能。
- Client 模式: 适合调试,Driver 运行在客户端。
- 特点: 分布式部署集群,将资源和任务监控交给 YARN 管理。支持两种运行模式:
- Spark on Mesos:
- 特点: 提供了灵活且自然的运行环境。支持两种调度模式:
- 粗粒度模式(Coarse-grained Mode): 每个应用程序的运行环境由一个 Driver 和多个 Executor 组成,内部可以运行多个 Task。在运行之前需要将资源全部申请好,即使不使用也一直占用,直到程序结束后回收资源。
- 细粒度模式(Fine-grained Mode): 提供按需分配资源的调度模式,避免了大量资源浪费。
- 特点: 提供了灵活且自然的运行环境。支持两种调度模式:
数据本地性
在 Apache Spark 中,数据本地性(Data Locality)是调度系统用来最小化网络传输的开销的一种优化策略。Spark 存储和计算数据的方式是尽可能让计算发生在数据所在的位置。这里是对 Spark 数据本地性级别的整理介绍,包括它们在任务调度中的应用。
- PROCESS_LOCAL(进程本地):
- 描述:数据位于将执行任务的相同 JVM 进程中。
- 优势:这是最高级别的数据本地性,意味着数据不需要网络传输即可被处理,从而达到最佳性能。
- NODE_LOCAL(节点本地):
- 描述:数据位于同一节点上,但不与执行任务的 JVM 进程相同。
- 优势:尽管数据不在同一进程中,但仍在同一台机器上,网络传输开销小于远程访问。
- RACK_LOCAL(机架本地):
- 描述:数据和计算任务不在同一节点上,但在同一机架内的不同节点上。
- 特点:网络传输的延迟比节点和进程本地性高,但低于跨机架通信。
- NO_PREF(无优先级):
- 描述:数据在集群中没有特定的优先节点。
- 特点:适用于数据已经广播到所有节点或者位置对计算影响不大的情况。
- ANY(任意位置):
- 描述:数据存储在集群中的任意位置,本地或远程。
- 特点:这是最低级别的本地性,可能涉及跨越机架的数据传输。
数据本地性与任务调度
在 Spark 中,任务调度和确定在哪执行计算与数据本地性紧密相关。具体的过程如下:
DAGScheduler:
程序开始时,SparkContext 构建一个作业的有向无环图(DAG)。DAGScheduler 根据转换的宽依赖(如 shuffle 操作)划分 Stage,并进行初步规划,但此时并未确定数据本地性。
TaskScheduler:
DAGScheduler 将 Stage 划分为任务集(TaskSet),然后 TaskScheduler 负责安排任务到具体的 Executor。它会考虑数据本地性,尽可能将计算任务与数据分布相匹配来提高效率。TaskScheduler 计算出最优的任务放置策略,并优先调度高级别的数据本地性任务。
在 Spark 任务调度中,数据本地性的优化目标是减少数据的网络传输,提高任务的处理速度。Spark 会优先选择 PROCESS_LOCAL
,然后是 NODE_LOCAL
,接着是 RACK_LOCAL
,最后才是 ANY
。根据不同工作负载的特性,Spark 的调度器会在尽可能获得更好数据本地性的同时确保计算资源的合理利用。
Master/Worker/Executor 的 HA
在 Spark 中,实现了对 Master、Worker、以及 Executor 的高可用(High Availability,HA)机制,以下是各种 HA 的具体处理方式:
Master 异常处理(High Availability for Master)
Spark 在 Master 异常处理方面提供了多种配置选项:
- ZOOKEEPER: 在这种模式下,集群的元数据信息会被持久化到 ZooKeeper 中。当 Master 出现异常时,ZooKeeper 通过选举机制选择新的 Master,并由新的 Master 接管集群。
- FILESYSTEM: 在这种模式下,集群的元数据信息持久化到本地文件系统。当 Master 出现异常时,只需要在该机器上重新启动 Master,新的 Master 通过获取持久化信息并根据这些信息恢复集群状态。
- CUSTOM: 用户可以自定义恢复方式,实现并配置
standloneRecoveryModeFactory
抽象类。当 Master 出现异常时,系统会根据用户自定义的行为恢复集群。 - None: 在这种模式下,不持久化集群的元数据。当 Master 出现异常时,新启动的 Master 不进行集群状态的恢复,而是直接接管集群。
Worker 异常处理(High Availability for Worker)
Worker 定期向 Master 发送心跳,通知 Master 自己的实时状态。当 Worker 出现超时时,Master 会调用 timeOutDeadWorker
方法进行处理,具体处理方式如下:
- 对于 Executor:Master 先通过发送信息
ExecutorUpdate
给对应的 Driver,告知 Executor 已经丢失,并将这些 Executor 从其应用程序列表删除。此外,还需要处理相关 Executor 的异常情况。 - 对于 Driver:Master 判断是否设置重新启动。如果需要重新启动,Master 调用
Master.schedule
方法进行调度,在合适的节点上重新启动 Driver。如果不需要重新启动,则删除该应用程序。
Executor 异常处理(High Availability for Executor)
- 当 Executor 发生异常时,由
ExecutorRunner
捕获该异常并发送ExecutorStateChanged
信息给 Worker。 - Worker 接收到消息后,在
Worker
的handleExecutorStateChanged
方法中,根据 Executor 状态进行信息更新,并将 Executor 状态发送给 Master。 - Master 在接收到 Executor 状态变化消息后,如果发现 Executor 异常退出,会尝试使用可用的 Worker 节点重新启动 Executor。
Spark 的内存管理机制
Spark 的内存管理机制在不同版本中有所变化。在 Spark 1.6 版本之前,存在一些问题,而在之后的版本中进行了改进。
Spark 1.6 版本之前的问题
- 固定内存大小: 旧方案中,storage 和 execution 的内存大小都是固定的,无法动态调整。即使 execution 内存有大量空闲,而 storage 内存不足,也不能共享内存,只能进行数据溢写(spill)。
- 资源浪费: 由于 storage 和 execution 内存无法相互借用,导致在某些情况下存在资源浪费的问题。
- Off-Heap 支持的局限性: 旧方案中,只有 execution 内存支持 Off-Heap 存储,而 storage 内存不支持 Off-Heap 存储。
新方案的改进
新方案对内存管理进行了改进,解决了上述问题:
- 内存动态调整: storage 和 execution 内存可以互相借用。当一方内存不足时,可以向另一方借用内存,从而提高了整体资源的利用率。
- Off-Heap 支持: 新方案中,不仅 execution 内存,而且 storage 内存均支持 Off-Heap 存储,增强了内存的灵活性。
- 误差缓冲: 系统自留的部分用作误差缓冲。由于 storage 和 execution 内存使用是估算的,存在一定误差。当其中一方内存使用超过限制时,系统自留的部分可以作为安全的误差缓冲,降低了发生 OutOfMemory 错误的概率。
这些改进使得新的内存管理方案更加灵活和高效,提升了整个 Spark 应用的性能和稳定性。
常用提交作业参数
Apache Spark 提供了一组配置参数,用于在提交作业时调整资源分配和执行行为。正确设置这些参数对于保证作业性能和资源利用率至关重要。以下是一些在提交 Spark 作业时重要的参数:
--executor-cores
定义了每个 Executor 使用的 CPU 核心数量。默认值为 1,但根据任务需求和集群配置,通常设置为更高的值(一般为 2-5)。根据实际企业环境调整,一些企业可能将其设置为 4。--num-executors
指定了要启动的 Executor 的数量。默认值为 2,但为了优化性能和资源利用,通常需要根据数据的大小和并行度设置更多的 Executor。--executor-memory
控制每个 Executor 的内存大小,默认值为 1 GB。进行资源分配时需要根据应用程序的内存需求来调整此参数。--driver-cores
设置驱动程序节点可用的 CPU 核心数,默认为 1。根据驱动程序的计算负荷,可能需要更多的核心。--driver-memory
指定了驱动程序可用的内存大小,默认值为 512 MB。对于内存密集型作业,需要提高这个参数的值以避免内存溢出。
通过调整这些参数,可以定义 Spark 作业在集群上运行时的资源配置。参数设置应综合考虑作业的特性、数据量、集群的资源状况等多方面因素。下面是一个提交 Spark 作业的典型命令样式:
1 |
|
在这个例子中:
- 使用
--master
指定了运行模式为本地模式,并用 5 个线程对应 5 个 CPU 核心。 --driver-cores
和--driver-memory
为驱动程序分配了 2 个 CPU 核心和 8 GB 内存。--executor-cores
、--num-executors
和--executor-memory
为每个 Executor 分配了 4 个 CPU 核心、启动了 10 个 Executor 以及每个 Executor 8 GB 内存。--class
指定了要运行的类的入口点。--name
设置了这个 Spark 作业的名称。- 最后提供了 JAR 包的路径和输入输出数据的路径。
正确配置这些参数是优化 Spark 应用性能的关键一步。考虑到集群的实际资源和作业需求,通常需要进行多次实验以找到最优的参数配置。
Yarn
工作机制
- 初始化 Spark 应用程序(Driver 启动):
- 用户编写 Spark 代码,构建 Application。
- 当 Application 运行时,首先在驱动器(Driver)程序中创建 SparkContext 实例。
- SparkContext 在 Spark 的作业执行中充当协调者的角色。
- 资源申请:
- SparkContext 向集群管理器(可为 Standalone、Mesos 或 YARN)申请资源用以初始化 Executor。
- Cluster Manager(资源管理器)根据资源的可用情况,在集群的 Worker 节点上启动 Executor 实例。
- 任务执行准备:
- Executor 向 Driver 注册,随后 SparkContext 把应用程序里的代码(JARs 或者 Python 文件)发送到 Executor。
- 这样,Executor 就有了执行任务所需的所有计算资源和代码库。
- DAG 构建与任务分配:
- Driver 中的 SparkContext 根据用户的应用程序代码构建一个 DAG。
- DAGScheduler 根据生成的 DAG 将作业逻辑分解成多个 Stage(可以并行或顺序执行的任务集合)。
- TaskScheduler 接收这些 Stage,将它们分解成许多 Task,并将这些 Task 分配给对应的 Executor。
- 任务执行:
- Task 在 Executor 上运行。当 Executor 上的 Task 运行完成后,Executor 会将结果返回给 Driver。
- 如果 Task 需要从其他 Executor 读取数据,该数据通过网络传输。
- 资源释放和作业完成:
- Task 执行完成后,并且 Driver 已经从所有 Executor 获取了结果,SparkContext 会关闭并释放资源。
- 最终结果返回给用户,Spark 应用程序结束。
Spark 的设计允许它有效地运行重复的数据处理任务,利用内存计算和优化的执行计划,大大提高了处理速度和系统的可扩展性。
执行流程
上述是一个 Spark 程序的基本执行流程,下面是对流程的详细解释:
Apache Spark 应用的执行涉及一系列分布式组件的相互作用,从驱动程序(Driver)的启动到作业(Job)和任务(Task)的创建及执行。以下是精练的概述来描述这个过程:
应用初始化和资源分配
- 应用提交和启动: 用户通过
spark-submit
提交应用,驱动程序启动,执行 main 函数并初始化SparkContext
。 - SparkContext 初始化: SparkContext 创建后,初始化包括
DAGScheduler
和TaskScheduler
在内的关键调度组件。 - 与 Spark Master 交互: 通过驱动程序中的
SparkContext
,Driver 向 Spark Master 注册,请求集群资源以运行 Spark 应用。 - 资源分配与 Executor 启动: Spark Master 根据资源调度策略,在集群的 Worker 节点上为应用启动 Executor 进程。
- Executor 注册: 启动的 Executor 向
TaskScheduler
反向注册自己,表明它们准备接受任务。
任务执行流程
代码执行与 Job 生成: 用户编写的代码被驱动程序执行到 Action 操作时,会生成一个新的 Spark Job。
Job 分解为任务:
DAGScheduler
接手 Job,将其分解为更小的执行单元(Stages),并创建相应的 TaskSet。Task 调度与分发:
TaskScheduler
负责将每个 TaskSet 内的任务分发到可用的 Executor 进行处理。任务执行: Executor 中的线程池接收任务,并使用
TaskRunner
执行真实的计算。每个 Task 负责处理 Partition 数据并计算结果。结果回传与聚合: 执行后的结果通过网络传回驱动程序,其中包括聚合操作。
循环处理直至应用完成
Job 循环执行: 以上过程反复进行,每个 Action 操作都触发新的 Job。驱动程序持续创建、提交并调度 Job 直到应用执行完毕。
应用结束: Spark 应用执行完毕,
SparkContext
会关闭,回收所有资源。
在这个过程中,数据本地性原则也被用来优化任务的执行,以减少数据传输带来的延迟和带宽成本。整个流程涉及层层调度与资源管理,目的是确保在一个动态并行环境中高效地执行 Spark 应用。
这个流程说明了 Spark 应用程序的执行过程,其中关键的概念包括 Driver、Executor、RDD、Job 和 Task。整个执行过程是按照逻辑依赖关系组织的有向无环图(DAG)进行的。
Spark on Yarn 执行流程
Spark on Yarn 模式的优点
Apache Spark 可以运行在 YARN(Yet Another Resource Negotiator)之上,这是 Hadoop 2. x 引入的集群资源管理系统。当 Spark 以 YARN 模式运行时,可以从这种集成中获得多项优势:
- 资源共享: YARN 允许多种数据处理引擎(如 Spark、MapReduce 等)共存于同一个集群中,它会调和资源分配,以实现系统资源(如内存、CPU 等)的最大化利用。这就意味着企业可以在同一个集群上运行 Spark 作业,而不需要单独的 Spark 集群。
- 资源按需分配: YARN 可以动态分配资源,确保每个应用程序根据当前需求获得适量资源。当应用程序不运行时,它不会占用任何资源,从而提高整个集群的资源利用率。
- 资源分配细致: YARN 允许对内存和 CPU 资源进行细粒度的控制。与 Spark 的 standalone 模式相比,YARN 可以更加精细地进行资源分配,以匹配不同工作负载的特定需求。
- 简化部署与管理: YARN 提供的集群管理功能简化了部署和运维流程。无论是 Spark 还是其他如 Storm 等框架,都可以利用 YARN 进行统一调度和资源管理,降低了管理的复杂性。
- 队列管理与资源弹性: YARN 通过队列机制来管理 cluster 资源,可以为不同队列设置不同的优先级和资源限制。在资源需要时可以过按照策略弹性扩展,适应多种类型应用并行运行场景。
利用 Spark on YARN 运行模式,可以更好地整合到已有的 Hadoop 生态系统内,并且相较于专用的 Spark 集群,既节省成本又提升资源利用效率。这对于需要长时间运行多种类型作业的大数据工作负荷来说是一个很大的优势。
任务阶段划分 App、Job、Stage、Task
在 Apache Spark 中,App(任务)、作业(Job)、阶段(Stage)和任务(Task)是执行和优化计算的关键概念。它们的划分有助于提高 Spark 集群上作业的执行效率。
Application(任务): 初始化一个 SparkContext 即生成一个 Application;
Job(作业):
- 定义: 一个 Spark 作业是由一组相互关联的 RDD 转换和动作操作组成的,这些操作形成一个有向无环图(DAG)。当用户在 Spark 应用程序中触发一个 Action 算子时,Spark 将启动一个新的作业来执行相关的 RDD 转换和动作。
- 划分: 一个 Action 算子就会生成一个 Job。通常,一个 Spark 应用程序由多个作业组成,这些作业之间是相对独立的。每个作业都包含一个或多个阶段。
Stage(阶段):
- 定义: 一个作业被划分为若干个阶段,每个阶段包含一组具有相同依赖关系的任务。一个阶段内的所有任务可以并行执行,而不同阶段之间存在依赖关系。
- 划分: 一个宽依赖对应一个 Stage。阶段的划分是基于 Shuffle 操作的存在与否,具有 Shuffle 操作的转换(例如 groupByKey、reduceByKey)会导致阶段的划分,因为 Shuffle 操作需要数据重新分区。
Task(任务):
- 定义: 一个阶段包含一组并行执行的任务,每个任务对应于 RDD 的一个分区。任务是 Spark 中最基本的执行单元,负责执行特定阶段中的具体计算。
- 划分: 一个 Stage 阶段中,最后一个 RDD 的分区个数就是 Task 的个数。任务的划分是基于 RDD 的分区。每个任务负责处理一个或多个输入分区的数据,执行相同的计算逻辑。
划分关系:
- 一个作业包含多个阶段,每个阶段包含一个或多个任务。
- 一个阶段通常对应于一个 Narrow Dependency,而具有 Shuffle 操作的转换(例如 groupByKey、reduceByKey)可能会引入新的阶段。
- 任务的数量通常等于输入 RDD 的分区数量。
在 Spark 中,作业、阶段和任务的划分是为了提高并行性和优化执行计划,使得 Spark 应用程序能够更有效地利用集群资源。这样的划分有助于 Spark 执行引擎有效地调度和执行任务,以提高整体性能。
宽依赖与窄依赖
Stage 的划分依据就是看是否产生了 Shuflle (即宽依赖), 遇到一个 Shuffle 操作就划分为前后两个 Stage.
在 Spark 中,依赖关系分为宽依赖和窄依赖:
窄依赖(Narrow Dependency):
- 窄依赖表示每个父分区最多对应一个子分区,这种依赖关系是一对一的。
- 窄依赖的转换操作不需要进行 Shuffle,每个父分区的数据只会影响到对应的子分区,因此计算是高效的。
- 窄依赖的例子包括 map、filter、union 等操作,这些操作不会引起数据重分布。
宽依赖(Wide Dependency): - 宽依赖表示每个父分区可能对应多个子分区,这种依赖关系是一对多的。
- 宽依赖的转换操作通常需要进行 Shuffle,即数据重分布,因为多个父分区的数据需要合并到一个子分区中,这会引起网络传输和磁盘 IO,计算成本较高。
- 宽依赖的例子包括 groupByKey、reduceByKey、sortByKey 等需要进行 Shuffle 的操作。
宽依赖和窄依赖的主要区别在于数据的重分布:
- 窄依赖: 父分区与子分区之间是一对一的关系,计算是局部的,不需要数据重分布,因此效率较高。
- 宽依赖: 父分区与子分区之间是一对多的关系,需要进行 Shuffle 操作,即数据的全局重分布,计算成本相对较高。
在 Spark 的执行计划中,每个 Stage 代表一组并行计算任务,Stage 的切分通常是基于宽依赖的转换操作,因为这些操作需要引入 Shuffle 过程。 Stage 决定了 Spark 作业的执行顺序和数据流向。
Spark 的内存模型
Spark 提供了丰富的内存管理功能,通过灵活配置,可以优化任务执行性能。以下是 Spark 的主要内存模块:
- 堆内存(Heap Memory):
- 堆内存用于存储 Spark 应用程序的对象数据,包括 RDD、DataFrame、DataSet 等。
- Spark 的堆内存主要用于存储元数据、变量和执行上下文等信息。
- 存储内存(Storage Memory):
- 存储内存用于缓存经常访问的数据,如 RDD 的缓存、数据框架的表格数据等。
- 存储内存的主要目的是减少磁盘 IO,通过在内存中缓存数据,加速后续操作。
- 执行内存(Execution Memory):
- 执行内存用于 Spark 执行计算和转换操作时的数据缓存和计算中间结果。
- 在执行过程中,Spark 将数据加载到执行内存中,进行各种计算操作,以提高性能和减少磁盘 IO。
- 优化的存储内存(Off-Heap Memory):
- 除了堆内存,Spark 还提供了一部分优化的存储内存,用于存储序列化的数据以及执行过程中的中间结果。
- 这部分存储内存一般会比堆内存具有更低的垃圾回收开销,并且可以提供更大的工作空间。
- 共享内存(Shared Memory):
- 共享内存用于跨任务之间的数据共享和协作,在同一个节点的不同任务间共享数据。
- 共享内存可以大幅减少数据的拷贝和网络传输,提高处理效率。
通过这些内存模块的组合,Spark 能够高效地管理数据和执行过程中的计算,提供高性能和可伸缩性。在实际使用中,可以根据应用程序的需求进行相关配置,以优化内存的利用和任务的执行。
Spark 并行度
在 Spark 中,合理设置并行度对于优化作业的性能至关重要。并行度决定了运行计算的任务数以及数据被切分的方式。以下是并行度的设置细节和依据:
- 任务的资源情况:
可以按照集群中 CPU 核心的数目来估算并行度。一个经验规则是每个 CPU 核心分配 2 到 4 个任务,这可确保 CPU 资源得到充分利用而不至于过载。
例如,如果有一个拥有 32 个 CPU 核心的集群,合理的并行度范围可能在 64 到 128 之间。这个设置可以确保每个核心在工作时都有足够的任务来保持忙碌,并且可以在其他任务阻塞时切换到备用任务。 - 数据量大小:
通常,并行度应至少和 RDD 中的分区数量一致,以确保所有数据都参与计算。对于大数据量,可能需要更多的分区来增加并行度,减少每个分区的数据量,让处理过程在资源限制下更高效。 - 内存与硬件容量:
并行度不应该设置得过高,以避免每个任务只处理很少量的数据,从而导致大量的调度和管理开销。同时,要根据可用的内存量和硬件性能进行调整,以避免由于太多任务而造成的内存溢出或 GC 问题。 - CPU 使用时间与内存使用量:
并行度的设置并不直接与数据规模相关,而是和 CPU 使用时间以及内存使用量有关。应该基于执行任务所需要处理的数据量以及处理时间来调整并行度,确保在有限的资源中得到最优的吞吐量。 - I/O 性能:
如果任务涉及大量 I/O 操作,比如读写数据库或网络存储,太高的并行度可能会导致 I/O 性能瓶颈。这时需要根据存储系统的 I/O 性能来调整并行度。 - Shuffle 操作:
对于会产生 Shuffle 的操作,如reduceByKey
,合理的分区数量能够减少在 Shuffle 阶段的数据传输,提升效率。
结论:理想的并行度设置取决于作业的具体需求和集群的资源情况。最佳设置通常需要根据作业的性能监控结果进行调试和调整。对于特定作业,也可以通过 Spark UI 来观察和调优任务的执行情况。此外,Spark SQL 的 spark.sql.shuffle.partitions
和 RDD 操作的 spark.default.parallelism
这两个配置选项是设置并行度的主要手段。
Cluster 模式和 Client 模式
在 Spark on YARN 模式下,Cluster 模式和 Client 模式的主要区别在于 Application Master 进程的位置和角色:
- Yarn-cluster 模式:
- 适用场景: 适用于生产环境,其中整个应用程序(Driver 和 Executors)运行在 YARN 集群中。
- Application Master: Driver 运行在 YARN Application Master 中,负责向 YARN 申请资源,并监督整个作业的运行状况。
- Client 关系: 用户在提交了作业之后可以关闭客户端(Client),作业会继续在 YARN 上运行,不依赖于客户端。
- Yarn-client 模式:
- 适用场景: 适用于交互和调试,即用户希望快速查看应用程序输出的场景。
- Application Master: Application Master 仅向 YARN 请求 Executors,而 Driver 运行在客户端(Client)进程中。客户端会与请求的 Executor 容器通信来调度它们的工作。
- Client 关系: Client 不能离开,它与 Executor 容器通信,负责整个作业的调度。
总体而言,选择 YARN Cluster 模式还是 YARN Client 模式取决于应用程序的性质和用户的需求。YARN Cluster 模式适用于生产环境,而 YARN Client 模式更适合开发和调试,以便用户可以与应用程序的执行交互并即时查看输出。
RDD
在 Apache Spark 中,RDD(弹性分布式数据集)是最基本的数据抽象,它是一个可被并行操作、可容错的分布式数据集合。以下是 RDD 的核心特点和相应的算子介绍:
RDD 特点
- 分区(Partitioning): RDD 数据被分割成多个分区,分区是并行处理的基本单位。RDD 可以分布在集群的多个节点上进行并行处理。
- 容错性(Fault Tolerance): RDD 的容错能力来自于其谱系图(lineage),记录了从一个稳定的数据源(如 HDFS)生成 RDD 的一系列转换。在部分分区数据丢失的情况下,Spark 可以重新应用这些转换来恢复丢失的数据,而无需重新开始整个计算。
- 不可变性(Immutability): RDD 是不可变的,也就是说一旦创建,它的数据就不会改变。对 RDD 的所有转换操作都会产生一个新的 RDD。
- 缓存与持久化(Caching and Persistence): RDD 提供了将数据缓存在内存中的能力,这样就可以在多个不同的操作中快速访问这些数据,减少磁盘 I/O 的开销,提高整个数据处理流程的效率。
- 惰性计算(Lazy Evaluation): RDD 操作具备惰性计算的特性,即转换操作仅仅定义了新 RDD 的逻辑;只有在触发行动操作时,实际的计算才会发生。
- 可选属性(Optional Features): 对于键值对类型的 RDD,可指定 Partitioner 控制键的分区策略。此外,每个分区会有一个优选位置列表,建议在哪些节点上执行,以减轻网络通信的负担。
RDD 弹性
在 Spark 中,RDD(Resilient Distributed Datasets)具有弹性的特性,体现在以下几个方面:
- 自动的进行内存和磁盘的存储切换: 当 RDD 的数据无法完全放入内存时,Spark 会自动将部分数据持久化到磁盘,以保证数据的可靠性和可用性。这种自动的存储切换机制使得 RDD 具有更好的弹性。
- 基于 Lineage 的高效容错: RDD 使用 Lineage 记录了其创建过程,即转化操作的顺序和依赖关系。当某个分区的数据丢失时,Spark 可以通过 Lineage 重新计算该分区的数据,而不需要重新计算整个数据集,提高容错性能。
- Task 如果失败会自动进行特定次数的重试: 在 Spark 中,当一个 Task 执行失败时,Spark 会自动进行有限次数的重试,尝试恢复失败的 Task,增加了任务执行的鲁棒性。
- Stage 如果失败会自动进行特定次数的重试: 类似于 Task 的重试机制,Spark 也对 Stage 提供了有限次数的重试机制。当一个 Stage 执行失败时,Spark 会尝试重新执行失败的 Stage,从而提高整体计算的可靠性。
- Checkpoint 和 persist,数据计算之后持久化缓存: Spark 允许用户通过
checkpoint
或persist
操作将 RDD 的计算结果持久化到存储系统,以便在后续的计算中重用。这种持久化机制可以降低由于计算过程中的故障导致的数据丢失风险。 - 数据调度弹性,DAG TASK 调度和资源无关: Spark 使用 DAG(有向无环图)调度模型,通过对 RDD 转化操作的依赖进行建模,使得调度具有弹性,不依赖于特定的资源管理系统。这使得 Spark 可以在各种集群管理器上运行,并更好地适应不同的环境。
- 数据分片的高度弹性: RDD 将数据分为多个分片,每个分片都是一个不可变的数据集。这种分片的设计使得 Spark 能够在计算过程中发生节点故障时,仅重新计算丢失的分片,而不需要重新计算整个数据集。这增加了整体计算的弹性和容错性。
RDD 算子
算子是用于在 RDD 上执行计算的函数,分为两种类型:转换算子和行动算子。
转换算子(Transformations): 如 map
(应用函数到每个元素上)、filter
(筛选出符合条件的元素)、flatMap
(将函数应用到每个元素后,扁平化输出结果)等。转换算子会创建一个新的 RDD。
行动算子(Actions): 如 count
(返回元素总数)、collect
(收集 RDD 所有元素到驱动程序)、reduce
(通过一个函数来聚合所有元素)等。行动算子会触发实际计算,并且可能输出到外部系统或返回到驱动程序。
Transformation 算子(变换/转换算子):
1. Value 数据类型的 Transformation 算子:
一对一型:
map(func)
: 对 RDD 的每个元素都应用一个函数。flatMap(func)
: 对 RDD 的每个元素都应用一个返回迭代器的函数,将所有结果展开成一个单一的序列。
多对一型:
union(otherDataset)
: 返回包含两个 RDD 元素的并集的新 RDD。cartesian(otherDataset)
: 返回包含两个 RDD 中所有元素对的新 RDD。
多对多型:
groupByKey()
: 对具有相同键的元素进行分组。reduceByKey(func)
: 在每个键的所有值上应用 reduce 操作。aggregateByKey(zeroValue)(seqOp, combOp)
: 在每个键的所有值上应用聚合操作。sortByKey()
: 根据键排序。
输出分区为输入分区子集型:
filter(func)
: 返回一个由通过函数 func 计算的所有元素组成的 RDD。distinct(numPartitions)
: 返回一个仅包含不同元素的新 RDD。subtract(otherDataset)
: 返回一个只包含在该 RDD 中而不在其他 RDD 中的所有元素的新 RDD。sample(withReplacement, fraction, seed)
: 返回 RDD 的随机样本子集。takeSample(withReplacement, num, seed)
: 返回一个固定大小的 RDD 的随机样本。
Cache 型:
cache()
: 将 RDD 的数据持久化到内存中。
2. Key-Value 数据类型的 Transformation 算子:
一对一型:
mapValues(func)
: 对 RDD 的每个键值对的值应用一个函数。
对单个 RDD 或两个 RDD 聚集:
combineByKey(createCombiner, mergeValue, mergeCombiners)
: 使用不同的返回类型和合并值函数的方式来聚合键值对元素。reduceByKey(func)
: 在每个键的所有值上应用 reduce 操作。partitionBy(numPartitions, partitioner)
: 返回分区好的 RDD。
连接:
join(otherDataset)
: 对两个 RDD 进行内连接。leftOuterJoin(otherDataset)
: 对两个 RDD 进行左外连接。rightOuterJoin(otherDataset)
: 对两个 RDD 进行右外连接。
Action 算子(行动算子):
无输出:
foreach(func)
: 对 RDD 的每个元素应用一个函数。
HDFS 算子:
saveAsTextFile(path)
: 将 RDD 的元素保存为文本文件。saveAsObjectFile(path)
: 将 RDD 的元素保存为序列化的 Java 对象文件。
Scala 集合和数据类型:
collect()
: 将 RDD 的所有元素以数组的形式返回到驱动程序。collectAsMap()
: 将 RDD 中的元素(键值对)以 Map 的形式返回。reduceByKeyLocally(func)
: 在本地执行按键缩小操作。lookup(key)
: 返回与给定键相关联的所有值的列表。count()
: 返回 RDD 中的元素数。top(num)
: 返回 RDD 中的前 N 个元素。reduce(func)
: 使用指定的二进制运算符减少元素。fold(zero)(func)
: 折叠元素,与 reduce 类似,但提供了零值。aggregate(zeroValue)(seqOp, combOp)
: 聚合元素。countByValue()
: 按值计数 RDD 中的元素。countByKey()
: 按键计数 RDD 中的元素。
这些算子提供了丰富的功能,使得可以在 Spark 中进行灵活而高效的数据处理。
RDD 创建
在 Apache Spark 中,RDD(弹性分布式数据集)是一个基础的、不可变的分布式数据集合。创建 RDD 的方式有多种,以下是一些常见的方法:
- 使用程序中的集合(内存中的数据)创建 RDD:
可以通过SparkContext
的parallelize
方法,将程序中的一个已存在的集合(例如 Array 或 List)转换成 RDD。
1 |
|
- 使用本地文件系统创建 RDD:
利用SparkContext
的textFile
方法,可以从本地文件系统中的文件创建 RDD。
1 |
|
- 从 HDFS(Hadoop 分布式文件系统)创建 RDD:
类似于从本地文件系统读取,textFile
方法也可以用来从 HDFS 读取数据创建 RDD。
1 |
|
- 基于数据库 DB 创建 RDD:
使用各种数据库连接器或 JDBC 来读取关系数据库中的数据,并将其转换为 RDD。
1 |
|
- 基于 NoSQL 创建 RDD(例如 HBase):
可以使用对应 NoSQL 数据库的连接器,如 HBase Connector,从非关系数据库中创建 RDD。
1 |
|
- 基于 Amazon S3 创建 RDD:
Spark 可以直接从 Amazon S3 存储服务中读取数据创建 RDD。S3 在这里像是一个文件系统一样被使用。
1 |
|
- 基于数据流创建 RDD(例如 Socket):
Spark Streaming 可以从实时数据源,如 TCP 套接字连接或 Kafka,创建一个称为 DStream 的特殊类型 RDD。
1 |
|
这些方法反映了 Spark 非常适合于处理各种来源的大型数据集,无论是静态文件、现有的数据库还是实时数据流。每种创建 RDD 的方式都有其特定的场景和使用案例。
尽量避免的算子
聚合类算子:
- reduceByKey (func): 使用给定的函数 func 对 RDD 中的元素进行聚合,然后对具有相同键的元素执行 reduce 操作。这是一个宽依赖的 shuffle 操作。
- groupByKey (): 根据键对 RDD 中的元素进行分组,将相同键的值放入一个迭代器中。这是一个宽依赖的 shuffle 操作。
- combineByKey (createCombiner, mergeValue, mergeCombiners): 自定义聚合逻辑的高级聚合操作。也是一个宽依赖的 shuffle 操作。
- foldByKey (zeroValue)(func): 类似于 reduceByKey,但是提供了一个零值,可以在每个分区中用于对缺失的键进行折叠。
- aggregateByKey (zeroValue)(seqOp, combOp): 自定义聚合逻辑的高级聚合操作,类似于 combineByKey。
避免的算子:
- join, cogroup: 需要将两个 RDD 的数据进行混洗 (shuffle),性能开销大。
- repartition (numPartitions): 对 RDD 进行重新分区,触发 shuffle 操作。
- distinct (): 去除重复元素,需要进行混洗 (shuffle)。
- sortBy, sortByKey: 对 RDD 进行排序,需要进行混洗 (shuffle)。
- coalesce (numPartitions): 对 RDD 进行缩减分区操作,可以减小分区数,但不会避免 shuffle 操作。
建议:
- 尽量使用窄依赖算子,这些操作不会触发 shuffle,而是在各个分区内独立完成,性能更高效。
- 当必须使用 shuffle 操作时,可以考虑进行持久化,以避免多次执行相同的 shuffle 操作。
在 Spark 中,优化性能的关键是尽量减少 shuffle 操作,因为 shuffle 是非常昂贵的。合理选择算子并考虑数据分区等因素,可以有效提升 Spark 作业的性能。
累加器
Spark 累加器具有以下几个显著特点,并在分布式计算中扮演重要角色:
- 全局唯一性和累加性: 累加器是一种只能进行“加”操作的变量,用于在不同的 Executor 之间安全地执行累加操作。它们是全局唯一的,确保在整个 Spark 作业中有一个一致的值。
- 写时独立,读时共享: 执行器(Executor)中的代码可以对累加器进行更新操作(如增加值),但这些更新只在 Spark 作业的执行过程中生效一次。累加器的更新是在各 Executor 中分别进行的,而累加的结果则最终在 Driver 程序中被读取。这使得累加器特别适合用于跟踪状态或进行计数,如监控任务失败的次数等。
- 在作业与任务间共享: 累加器的作用范围是在整个 Spark 应用程序中,这意味着在同一个应用程序内的不同作业之间可以共享累加器,但累加器不可以跨不同的 Spark 应用程序共享。累加器的值是在 Task 之间不可见的,每个 Task 处理的累加操作只在其完成时提交,随后被合并到 Driver 程序中的累加器中,从而保证了其全局唯一性。
- 容错性: Spark 的累加器设计为只在行动(action)操作中可靠地更新一次,如果包含累加器更新的 Task 因为某些原因需要重新执行,那么这些累加器的更新将会被撤销,不会被重复计算。这样的设计使得累加器在分布式系统中具有容错性。
- 自定义累加器: Spark 允许用户定义自己的累加器,这些累加器可以累加自定义的数据类型。自定义累加器需要通过继承
AccumulatorV2
类并实现相应的方法来创建。
需要注意的是,累加器仅在每个节点上的 Task 执行所需的作业之后可靠地更新一次,并且用户不应依赖在转换(transformation)操作中的累加器行为,因为它们可能在失败后或为了各种优化目的而被 Spark 重新计算。累加器通常用于调试目的或作为计数器来监控 Spark 作业的某些方面,例如数据记录的丢失和过滤计数。
广播变量
Spark 广播变量是分布式计算环境中的一项功能,主要用于有效地将大数据集在所有节点之间共享,具有以下特点:
- 数据共享与优化: 广播变量使用一种高效的广播算法将变量内容分发给所有的节点,节省了网络带宽和减少了数据的重复传输。
- 只读特性: 广播变量在任务中是只读的,这意味着它们可以安全地在多个任务和不同阶段间共享而不会改变,保证了数据的一致性。
- 内存使用效率: 使用广播变量可以避免针对每个任务的多次数据传输,因为数据在每个节点上只需要拷贝一次,从而减少了内存的冗余使用。
- 宽泛的应用场景: 常见用例包括共享较大的输入数据集、共享的机器学习模型参数等情形。
- 高效的分布式数据处理: 广播变量经常和累加器并用,广播变量用于广播端数据,累加器用于收集分布式计算结果。
- 支持懒加载: 广播变量支持懒惰广播,在实际使用之前不会将数据实际传输到节点,优化了资源的使用。
- 易于使用的 API: Spark 提供了简单的 API 来创建和访问广播变量,使得其可以轻松集成到现有的 Spark 应用中。
举个例子,在一个大数据集的计算任务中,如果有一个相对较小但需要频繁访问的数据集合,比如查找表(lookup table)或者特征数据集,可以通过 Spark 的广播变量将这个数据集合广播到集群中的所有节点上,避免了数据集合在任务之间的重复传输,并提高了整体处理效率。
要使用广播变量,你需要使用 SparkContext 的 broadcast
方法创建一个广播变量,然后在 Spark 任务中读取 value
属性来获取其内容。例如:
1 |
|
在任务执行时,myBroadcastVar
会被丢到每个节点上一次,节点间的任务可以共享这个广播变量。如果不广播而是直接使用变量,那么每个任务在执行时都会获取该变量的一份副本,造成不必要的资源浪费。
需要注意的是,尽管广播变量是只读的,但在使用后应当在任务结束时调用 destroy
方法来释放分配的资源。在分布式计算中,留意资源的分配和回收非常重要,涉及到集群的整体性能和稳定性。
ReduceByKey 与 GroupByKey 的区别
reduceByKey
和 groupByKey
都是 Apache Spark 中的转换操作,这两种操作都用于对键值对型 RDD(Key-Value Pair RDD)中的元素按照键(Key)进行分组,并且都会引发 Shuffle 过程。然而,它们在处理数据时的行为和性能方面存在显著的差异。主要区别在于 reduceByKey
具有预聚合操作,而 groupByKey
没有预聚合操作。
reduceByKey:
reduceByKey
合并具有相同键的值,在数据被 Shuffle 到不同的分区之前,尽可能先在每个分区中合并(Map 端合并)。这通常减少了网络传输的数据量,因为经过合并后的数据要小于原始数据。reduceByKey
在 Map 端进行预聚合,有效地优化了 Shuffle 过程。对于像素化计数器、总和、平均值等,这是一个高效的策略。
groupByKey:
相比 reduceByKey
,groupByKey
仅仅是将所有具有相同键的值收集在一起。它不进行任何预聚合(Map 端合并)操作。因此,使用 groupByKey
时,整个网络传输过程中将移动所有数据。这可能导致大量数据在网络上传输,如果数据集非常大的话,这会带来性能问题,尤其是 IO 和网络瓶颈。
选择建议:
在需要对数据进行分组操作的场合,**推荐的做法通常是在不影响业务逻辑的前提下,优先采用reduceByKey
**(比如 foldByKey
、aggregateByKey
),因为它们在 Map 端进行预聚合,减少了 Shuffle 的数据量,提升了处理效率。
而 groupByKey
在以下情况下使用是合理的:
- 当你需要将所有的值按照键集中到一起且不进行任何聚合操作时。
- 如果已知结果的值列表大小是有界的,不至于太大到消耗过多的内存或者引发网络问题时。
在设计 Spark 程序时,合理选择 reduceByKey
或 groupByKey
可以显著影响程序的性能和资源的使用。通常情况下,应优先考虑使用 reduceByKey
来减少资源使用和提高效率。
ReduceByKey、FoldByKey、AggregateByKey、CombineByKey 区别
算子 | 初始值 | 过程 |
---|---|---|
ReduceByKey foldByKey aggregateByKey combineByKey |
没有初始值 有初始值 有初始值 初始值可以变化结构 |
分区内和分区间逻辑相同 分区内和分区间逻辑相同 分区内和分区间逻辑可以不同 分区内和分区间逻辑不同 |
在 Spark 中,reduceByKey
、foldByKey
、aggregateByKey
和 combineByKey
都用于在键值对(pair)类型的 RDD 上执行按键聚合的操作。尽管目的类似,但他们提供了不同层次的灵活性和控制,以应对不同的使用场景。所有这些操作都会触发 shuffle 过程,根据键将数据分布到不同的任务中去。
下面我们逐一解释每个操作,并指出它们各自的差别:
reduceByKey
reduceByKey
对每个键进行聚合操作,该操作必须是可交换且关联的,因为操作会在不同的节点上各自进行并在最后汇总。它有效地在 map 端进行预聚合(即“combiner”),以减少数据传输。
foldByKey
foldByKey
和 reduceByKey
非常相似,不同之处在于 foldByKey
需要一个初始“零值”来用作每个键的默认值。foldByKey
也会在 map 端进行相同键的局部合并,然后再进行全局合并。
aggregateByKey
aggregateByKey
提供了更高的灵活性,你可以定义不同的返回类型,并提供两个不同的函数—一个用于 map 端的局部聚合(这个函数内部操作的两个参数类型可以是不一样的),另一个用于 reduce 端的全局聚合。
combineByKey
combineByKey
是一个更为一般化的聚合函数,它对于第一个见到的键值对会调用一个函数,转换值类型,然后用转换后的类型进行聚合。combineByKey
需要三个参数:一个用于将输入值转换为一个中间值类型的函数,一个用于在这种中间值类型上执行局部聚合的函数,以及一个用于在相同的中间值类型上执行两两之间的全局聚合的函数。
combineByKey
是这些聚合操作中最复杂的,但也是最强大的,其灵活性允许进行不同的转换和聚合过程,你可以利用它实现其他任何基于键的聚合功能。
总结
reduceByKey
: 适用于当你只需要按键聚合且聚合函数是可交换且关联的情况。foldByKey
: 当你需要一个初始值,同时聚合函数是可交换且关联的。aggregateByKey
: 适用于当你需要对结果类型和初始值执行不同的聚合逻辑或者初始值不是操作的零值时。combineByKey
: 提供最高的灵活性,适合于需要对聚合过程、中间数据类型或者结果值进行完全控制的场景。
在实际使用中,选择哪种函数通常取决于特定数据的形状和需要进行的具体聚合运算。通常我们会根据简单性和性能需求来选择最合适的聚合操作。
Join 操作
Apache Spark 提供了多种数据集(RDD、DataFrame、Dataset)之间的 join 操作,用以实现类似于 SQL 中的表连接。以下是 Spark 支持的几种常见的 join 类型:
- 内连接(Inner Join):
- 使用
join
方法。 - 只有在两个数据集中都有相同 key 的记录时,才会被包含在结果中。
- 类似于 SQL 中的
INNER JOIN
,结果集是两个数据集键值相等的交集部分。
- 使用
- 左外连接(Left Outer Join):
- 使用
leftOuterJoin
方法。 - 结果集包括左侧数据集的所有记录,即使右侧数据集没有对应的 key。
- 在右侧数据集中没有匹配的 key 时,相关联的值会被填充为 None 或者 null。
- 类似于 SQL 中的
LEFT OUTER JOIN
。
- 使用
- 右外连接(Right Outer Join):
- 使用
rightOuterJoin
方法。 - 结果集包括右侧数据集的所有记录,即使左侧数据集没有对应的 key。
- 在左侧数据集中没有匹配的 key 时,相关联的值会被填充为 None 或者 null。
- 类似于 SQL 中的
RIGHT OUTER JOIN
。
- 使用
- 全外连接(Full Outer Join):
- 使用
fullOuterJoin
方法。 - 结果集包含了左右两个数据集的所有记录。
- 如果某个 key 在一个数据集中不存在,则对应的值将被填充为 None 或 null。
- 类似于 SQL 中的
FULL OUTER JOIN
。
- 使用
注意:在实现 join 操作时,Spark 的性能可能会受到数据倾斜的影响,因为 join 操作会引起数据在网络中的传输(Shuffle 过程)。一些策略,比如调整数据分区、广播小数据集、过滤不必要的数据可以用来缓解这一影响。
另外,对于 DataFrame 和 Dataset,在 Spark SQL 中还提供了其他更复杂的连接类型,例如使用 crossJoin
进行笛卡尔积操作,以及支持使用 broadcast
提示对小表使用广播连接优化。
在使用时,应选择符合数据关联特点和性能需要的合适 join 类型。
持久化 Persist 操作
在 Spark 中,持久化是为了缓存或者将 RDD 的计算结果存储到内存或磁盘中,以便在后续的计算中能够更快地获取这些数据。持久化是一种权衡计算速度和内存/存储开销的方式。
一般什么场景下要进行 Persist 操作?
- 迭代计算: 在迭代算法中,如果某个 RDD 在每次迭代中都会被重复使用,那么可以考虑将其持久化,以避免重复计算。
- 宽依赖操作后: 当一个 RDD 有宽依赖(即父 RDD 的一个分区可能对应多个子 RDD 的分区)时,触发 Shuffle,这个时候通常建议将数据持久化,以免重新计算。
- 长链操作后: 如果一个 RDD 参与了一个长链的转换操作,例如多个 map 和 filter,为了加速后续的操作,可以将其持久化。
- Checkpoint 操作前: 在进行 Checkpoint 之前,通常会将需要 Checkpoint 的 RDD 持久化,避免因为 Checkpoint 重新计算。
- 广播变量: 在使用广播变量时,如果广播变量的大小适中,可以考虑将其持久化,以提高在各个节点上的获取速度。
- 容错需求: 持久化也是一种容错机制。当 Spark 在执行任务的过程中发生节点失败,持久化的数据能够帮助在失败的节点上重新计算而不是从头开始。
总的来说,持久化适用于需要重复使用的、计算代价高昂的、或者需要容错的数据。在选择持久化级别时,需要根据数据的使用频率、计算代价以及可用内存和磁盘空间等因素进行权衡。
Cache 和 Persist 的区别
在 Apache Spark 中,cache
和 persist
方法都是用来对 RDD 进行持久化的操作,但是两者之间存在一些区别和特点:
cache
简化的持久化: cache
方法是持久化操作的简化版,它会将 RDD 以默认的存储级别(通常是 MEMORY_ONLY
)持久化。
无需指定存储级别: 使用 cache
方法时,无法指定其他的存储级别,因为它不接受任何参数。
依赖保留: cache
操作后,RDD 的血缘关系(即它的依赖链)保留,不会被切断。
示例代码:
1 |
|
persist
灵活的持久化: persist
方法提供了灵活性,允许开发者选择不同的持久化级别。
多种存储级别: 启用不同的存储级别,如:
MEMORY_ONLY
MEMORY_ONLY_SER
MEMORY_ONLY_2
MEMORY_ONLY_SER_2
MEMORY_AND_DISK
MEMORY_AND_DISK_SER
MEMORY_AND_DISK_2
MEMORY_AND_DISK_SER_2
DISK_ONLY
DISK_ONLY_2
MEMORY_ONLY_SER
OFF_HEAP
灵活的存储选项:persist
方法允许用户设置更多的参数,例如,可以指定存储的序列化方式、存储的副本数等。
通用方法:cache
方法实际上是调用了persist
方法的一种特殊情况。
示例代码:
1 |
|
需要注意的是,你提到的 Checkpoint
与 cache
和 persist
是不同的机制。Checkpoint
将数据物理存储到文件系统(如 HDFS)上,并且切断了 RDD 的血缘关系,有助于减少对长血缘链的依赖,避免在出现故障时需要过长时间的恢复过程。而 cache
和 persist
通常会保持血缘关系。
建议使用:
- 使用
checkpoint()
的 RDD 通常也应该cache()
。这是因为执行checkpoint()
操作时,Spark 会重新计算 RDD 和所有的依赖,而cache()
可以让 Spark 在 checkpoint 时利用缓存的数据。 - 如果没有特定的存储级别要求,
cache
方法是方便的。 - 如果需要根据内存使用情况和应用的特定需要来微调数据存储级别,应该使用
persist
方法。
在大多数场景下,如果您不需要特殊配置存储级别,cache
方法足够使用。如果需要更细粒度的控制,或者运行在资源受限的环境中,那么 persist
方法可以提供更多的选项。
RDD 的缓存级别
在 Spark 中,RDD 的缓存级别(Storage Level)决定了 RDD 的缓存方式。以下是常见的 RDD 缓存级别:
- NONE:
- 什么类型都不缓存。
- DISK_ONLY:
- 将 RDD 的分区数据存储到磁盘上,以便需要时可以重新计算。
- DISK_ONLY_2:
- 与 DISK_ONLY 相同,但是存储两个副本。
- MEMORY_ONLY:
- 将 RDD 的分区数据存储在内存中,采用反序列化的方式存储。如果 RDD 的内容无法完全存储在内存中,剩余的分区将在以后需要时重新计算,不会刷到磁盘上。
- MEMORY_ONLY_2:
- 与 MEMORY_ONLY 相同,但是存储两个副本。
- MEMORY_ONLY_SER:
- 将 RDD 的分区数据存储在内存中,采用序列化的方式存储。这种序列化方式以字节数据存储每个分区的内容,节省内存空间,但会增加 CPU 开销。
- MEMORY_ONLY_SER_2:
- 与 MEMORY_ONLY_SER 相同,但是存储两个副本。
- MEMORY_AND_DISK:
- 将 RDD 的分区数据存储在内存中(采用反序列化方式),如果内存不足,则将剩余的分区存储到磁盘上。
- MEMORY_AND_DISK_2:
- 与 MEMORY_AND_DISK 相同,但是存储两个副本。
- MEMORY_AND_DISK_SER:
- 将 RDD 的分区数据存储在内存中(采用序列化方式),如果内存不足,则将剩余的分区存储到磁盘上。
- MEMORY_AND_DISK_SER_2:
- 与 MEMORY_AND_DISK_SER 相同,但是存储两个副本。
这些缓存级别允许用户根据内存、磁盘和副本需求来调整 RDD 的缓存方式,以平衡性能和资源消耗。
Shuffle 机制
Apache Spark Shuffle 过程是处理大规模数据集时的一个关键环节。它发生在需要跨分区重新分配数据的操作中,如 reduceByKey
或 join
。有效地执行 Shuffle 对提升性能至关重要,尤其是因为它可能涉及大量数据在节点间的传输。以下是对 Spark Shuffle 过程和其不同实现方式的总结整理:
Shuffle 过程
- Map 阶段: 数据经过 map 操作被处理,每个 Executor 或 Task 创建多个 Shuffle 文件。
- 溢写与排序: 算子在内存缓冲区满之后将数据溢写到磁盘,并可能在此过程中进行排序。
- Shuffle 获取: Reduce 阶段前,每个 Reduce 任务跨网络获取必要的分区数据。
- Reduce 阶段: 每个 Reduce 任务处理对应的数据,并可能执行聚合操作。
- 结果输出: 最终结果输出,可能作为新数据集分区或存储在外部系统。
Shuffle 实现方式
Hash Shuffle:
- Map 阶段: 每个 Map Task 维护多个输出文件,每个 Reduce Partition 一个。
- Read 阶段: Reduce Task 执行时需要从所有相关 Map Task 拉取数据,导致开销较大。
Sort Shuffle:
- Map 阶段: 数据按 Partition ID 和 Key 排序后写入一个文件,减少文件数量。
- Read 阶段: 使用
ExternalAppendOnlyMap
合并数据,减少内存使用,避免 OOM。
Unsafe Shuffle:
- Map 阶段: 采用二进制格式存储数据,减少内存占用和 GC 压力。
- Read 阶段: 支持 Serializer relocation,避免频繁序列化和反序列化。
在不同 Spark 版本中的优化
- 自 Spark 1.2 起,Sort Shuffle 变为默认的实现方式。
- Spark 2.1 引入不同的 Shuffle Writer 如
BypassMergeSortShuffleWriter
、SortShuffleWriter
和UnsafeShuffleWriter
,取决于是否启用了 map 端的组合以及 Serializer 是否支持 relocation。
Shuffle 实现方法的选择基于现场情况的资源和性能考量。例如,UnsafeShuffleWriter
由于其高效的内存利用和快速的数据处理,在内存丰富的环境中通常是首选。然而,不同的场景可能需要不同的 Shuffle 策略。
在开发 Spark 应用时,开发者会积极寻找方法来减少 Shuffle 的影响,因为 Shuffle 不仅会增加执行时间,也可能造成网络和 IO 的瓶颈。可能的优化策略包括调整 partition 数量、使用本地化数据操作以及选择更高效的算子来减少数据传输。
Hadoop 和 Spark 的 Shuffle 对比
Shuffle 相同点:
- 数据分区和传输: 在 shuffle 阶段,Hadoop 和 Spark 都需要对 Mapper 输出的数据进行分区,并将不同分区的数据传输给对应的 Reducer。这是为了保证相同 key 的数据被发送到同一个 Reducer 节点,以进行后续的聚合和处理。
- Reducer 处理: 在 shuffle 阶段,Reducer(或者 Spark 中的后续 ShuffleMapTask 或 ResultTask)都会对传入的数据进行处理,可能涉及到数据的聚合、排序等操作,最终执行 Reduce 操作或后续的一系列操作。
Shuffle 差异点:
排序机制:
- Hadoop MapReduce: 基于排序的机制,进入 combine () 和 reduce () 的记录需要先进行排序。这允许处理大规模数据,因为输入数据可以通过外部排序实现。
- Spark: 默认使用基于哈希的方式,通常使用 HashMap 对 shuffle 过来的数据进行聚合,避免了提前排序。Spark 1.2 版本之后,可以将 Shuffle Manager 设置为 sort,从而对数据进行排序。
实现方式:
- Hadoop MapReduce: 明确划分为多个阶段,如 map ()、spill、merge、shuffle、sort、reduce ()等。每个阶段负责特定的功能,可以逐个阶段按照过程式编程思维进行实现。
- Spark: 没有明确划分为不同阶段的功能,只有不同的 stage 和一系列的 transformations。一些操作,如 spill、merge、aggregate 等,需要隐含在 transformations 中。Shuffle write 和 shuffle read 的处理逻辑需要在作业的逻辑或物理执行图中加入相应的处理逻辑。
总体而言,虽然有一些共通的点,Hadoop 和 Spark 在 shuffle 阶段的设计和优化思路上存在一些差异,体现在排序机制、功能划分和实现方式等方面。
Parition 和 Block 的关系
在 Spark 中,Partition 和 Block 是两个不同的概念,它们在数据处理中有不同的角色和作用。
Block(HDFS Block):
- 定义: 在 Hadoop 分布式文件系统(HDFS)中,Block 是文件存储的最小单位。当一个文件被存储到 HDFS 时,它会被分成一个个大小相等的块,这些块是 HDFS 的物理存储单元。
- 特点: HDFS Block 的大小是固定的,通常设置为默认的 128MB 或 256MB。这种块的设计旨在实现分布式存储和数据冗余。每个块都有多个副本,分布在不同的节点上,以确保数据的可靠性和容错性。
Partition(Spark Partition): - 定义: 在 Spark 中,Partition 是指 Spark 中弹性分布式数据集(RDD)的最小逻辑分区。一个 RDD 可以分为多个 Partition,每个 Partition 包含了数据集的一个子集。
- 特点: RDD 的 Partition 大小不是固定的,而是根据数据的大小和分布情况动态确定。在 Spark 中,Partition 的数量和分布取决于执行的转化操作和初始数据的分块方式。Partition 的设计是为了实现数据的分布式计算。
关联关系: HDFS Block 与 Spark Partition 之间的关联是通过 Spark 读取 HDFS 文件的方式建立的。当 Spark 从 HDFS 中读取数据时,它会以 HDFS Block 为基本单位将数据分割成 Partition,每个 Partition 对应一个 HDFS Block 的数据。
总体来说,HDFS Block 主要关注数据的物理存储和冗余,而 Spark Partition 主要关注数据的逻辑划分和分布式计算。在 Spark 中,Partition 是构建 RDD 的基本单元,而 HDFS Block 是构建 HDFS 存储系统的基本单元。两者在数据处理中有不同的目的和职责。
Partitioner 的种类
在 Apache Spark 中,Partitioner
负责定义如何将键值对 RDD 的元素按键分派到 RDD 的各个分区中。内置的 Partitioner
用来控制数据如何进行 shuffle。Spark 提供了两种默认的 Partitioner
:
HashPartitioner
- 原理:
HashPartitioner
是 Spark 中最常用的分区器。它使用 Java 中的Object.hashCode
方法来计算数据项的哈希码,并对哈希码进行取模运算得到分区编号。其公式如下:partition = key.hashCode() % numPartitions
,这里的numPartitions
是分区的数量。 - 使用场景:
HashPartitioner
适用于需要将数据凭借键快速均匀分布到分区中时。它通常用于reduceByKey
和groupByKey
这类依赖于键分组数据的操作。
RangePartitioner
- 原理:
RangePartitioner
通过对键进行排序后分配到各个分区。它首先抽样输入数据,以便了解其分布,然后根据抽样结果确定分区间的范围界限,以实现数据均匀分布于各个分区。 - 使用场景:
RangePartitioner
适用于当数据处理需要在排序的基础上进行时。sortByKey
等需要有序分区的操作通常使用RangePartitioner
。
Spark 用户还可以自定义 Partitioner
。自定义分区器允许开发者根据应用的需求,定义特定的数据分区逻辑。例如,根据特定条件或数据属性来分区、根据地理位置或其他业务逻辑要求将数据集中到特定节点等高级用法。
自定义 Partitioner
需要继承 Partitioner 类并重写两个方法:
numPartitions
: 返回分区数量。getPartition(key: Any)
: 返回给定键所处的分区索引。
自定义分区器的例子如下:
1 |
|
HashParitioner
HashPartitioner
的弊端主要在于数据倾斜的问题,这是由于哈希函数的不完美性和数据分布的不均匀性导致的。具体的问题和弊端包括:
- 数据倾斜:
HashPartitioner
使用 key 的哈希码来确定分区,如果某些键的哈希码分布不均匀,即使数据分布在原始 RDD 中是均匀的,最终在分区后也可能导致某几个分区的数据量远远大于其他分区,造成数据倾斜。 - 性能问题: 由于数据倾斜,导致部分分区的数据量较大,从而影响了并行性和任务执行的效率。一些任务需要等待数据较多的分区完成后才能继续,降低了整体的计算速度。
- 内存消耗: 对于数据量极大的分区,可能会导致内存不足,影响任务的执行。
- 可能的任务失败: 在数据倾斜的情况下,极端的分区可能会导致某些任务需要处理大量数据,容易导致任务运行失败。
为了解决数据倾斜问题,可以考虑使用其他类型的分区器,如 RangePartitioner
、CustomPartitioner
等,或者在特定情况下手动处理数据倾斜,采用一些特殊的处理方式。
RangePartitioner
Spark 中的 RangePartitioner 是用于键值对 RDD 的一种分区器,其目标是将数据分散到不同的分区中,同时确保分区之间的数据有序。
RangePartitioner 的工作原理主要基于下述几个步骤:
- 范围确定: 首先需要确定每个分区应该包含哪个范围内的键。为了做到这点,RangePartitioner 使用了水塘抽样(Reservoir Sampling)算法对 RDD 中的元素进行抽样,得到一个近似有序的样本集合。
- 范围边界计算: RangePartitioner 通过样本集合中的元素来确定分区范围的边界(即
rangeBounds
),这些边界是有序的。根据设定的分区数量,这些边界将整个键的数据范围划分成连续的区间,每个区间对应一个分区。 - 分区映射: 当进行数据分区时,每个键值对会根据其键与已确定的边界进行比较,映射到不同的分区。键小于第一个边界的数据将被映射到第一个分区,键介于第一个边界和第二个边界之间的数据将映射到第二个分区,以此类推,这确保了分区间是有序的。
- 分区内顺序: 分区内的数据并不保证有序。RangePartitioner 只保证全局范围内数据的有序性,即分区之间是有序的。在每个分区内部,数据的顺序是不定的。如果有必要的话,需要在每个分区内部进一步对数据进行排序。
RangePartitioner 的一个重要特点是平衡性,通过抽样和边界的确定,它尽量使得每个分区中的数据量大体上保持均匀,从而促进任务的均衡执行,避免某些节点因数据过多而成为瓶颈。
使用 RangePartitioner 的一个常见场景是对数据进行排序处理,特别是在需要有序处理大量数据、进行范围查询或者其他排序操作的情况下。由于它根据键的实际分布来确定分区边界,这在很大程度上减少了数据倾斜的风险。
RDD、DataFrame、DataSet、DataStream 对比
Spark 提供了四种不同的抽象数据类型:RDD(Resilient Distributed Datasets)、DataFrame、DataSet 和 DataStream,它们具有不同的特点和用途。以下是它们之间的区别:
RDD(Resilient Distributed Datasets):
- RDD 是 Spark 最早引入的 API,它是一个弹性分布式数据集。RDD 是一个不可变的分布式对象集合,可以通过一系列的转换操作进行处理和转换。
- RDD 提供了容错性和可靠性,可以在内存中缓存数据,并支持高级的数据操作和函数式编程。
- RDD 是强类型的数据结构,需要在编写程序时指定元素的类型,并通过编写函数来进行处理。
DataFrame: - DataFrame 是以表格形式组织的分布式数据集合,类似于关系型数据库中的表,每列有具体的名称和类型,可以进行类 SQL 的查询和操作。
- DataFrame 提供了更高级的抽象和优化,支持更丰富的数据处理和查询功能,例如过滤、聚合、连接等。
- DataFrame 是弱类型的数据结构,可以使用 SQL 语句或 DataFrame API 进行操作,而不需要指定具体的数据类型。
DataSet: - DataSet 是比 RDD 更高层次的抽象,它是 RDD 和 DataFrame 的结合体,提供了 RDD 的强类型和 DataFrame 的高级查询功能。
- DataSet 提供了类型安全和更高性能的数据操作,同时支持类 SQL 的查询和函数式编程。
- DataSet 是强类型的数据结构,需要在编写程序时指定元素的类型,并通过编写函数来进行处理。
DataStream: - DataStream 是 Spark Streaming 中的抽象,用于处理实时流式数据。
- DataStream 将连续的数据流视为离散的事件流,可以进行窗口操作、聚合、过滤等实时流处理操作。
- DataStream 提供了事件时间和处理时间两种处理模式,以及窗口操作和状态管理等功能。
总的来说,RDD 是最基本的抽象数据类型,DataFrame 和 DataSet 提供了更高级和优化的数据操作能力,DataStream 专门用于处理实时流式数据。用户可以根据具体的需求和场景选择适合的数据抽象类型。
MapReduce 对应的算子
在 Hadoop MapReduce 中,Mapper 和 Reducer 阶段相当于 Spark 中的以下算子:
- Mapper 阶段: 相当于 Spark 中的
map
算子。Mapper 用于将输入数据集的每个元素映射为键值对。 - Reducer 阶段: 相当于 Spark 中的
reduceByKey
算子。Reducer 用于对 Mapper 输出的键值对进行聚合操作,具体的聚合逻辑由开发者定义。
需要注意的是,虽然有相似性,但也存在一些区别,特别是在数据处理模型和执行引擎上。 Spark 的 reduceByKey
算子和 Hadoop MapReduce 的 Reducer 阶段都涉及到数据的聚合,但 Spark 更灵活,支持更多复杂的操作,并且具有更高的性能,因为 Spark 在内存中进行数据处理,避免了不必要的磁盘读写。
RDD 懒加载
RDD 的懒加载(Lazy Evaluation)是指在 Spark 中,对 RDD 的转换操作不会立即执行,而是等到遇到行动(Action)操作时才会真正触发计算。这种延迟计算的机制有一些优势:
- 节省计算资源: 如果一个 RDD 经历了多个转换操作,但最终并没有被使用,那么就没有必要进行这些转换的计算,从而节省了计算资源。
- 优化执行计划: Spark 可以在执行计划中进行优化,例如合并相邻的转换操作,选择更优的执行路径,以提高整体计算性能。
- 避免重复计算: 在需要多次使用同一个 RDD 时,懒加载可以避免重复计算,因为计算结果会被缓存起来供后续使用。
- 按需计算: 懒加载使得 Spark 可以根据实际需要按需计算,而不是一开始就计算所有转换操作,从而更加灵活。
懒加载的实现基于 RDD 的有向无环图(DAG,Directed Acyclic Graph)。当创建一个 RDD 后,它只是 DAG 中的一个节点,而不是立即执行的计算。只有当 Action 操作被调用时,Spark 才会从 DAG 的起始节点开始逐步计算,直到达到 Action 节点,最终生成结果。
这种懒加载的特性使得 Spark 更加灵活和高效,能够优化计算过程,根据实际需要进行计算,从而提高整体的性能和资源利用率。
Spark Streaming
其他
序列化与反序列化
序列化和反序列化是为了在存储、传输和处理数据时实现一种有效的格式转换。下面是一些关于为什么要进行序列化和反序列化的原因:
数据持久化: 序列化允许将数据转换为字节流或其他格式,以便将其保存在磁盘上或通过网络传输。这使得数据可以长期存储,并在需要时重新加载。
数据传输: 在分布式系统中,数据需要在不同的节点之间传输。序列化将数据转换为字节流,可以更高效地在网络上传输,减少网络带宽的使用。
跨平台交互: 序列化允许在不同的编程语言和平台之间交换数据。通过将数据序列化为通用格式,不同系统可以更容易地相互通信和交换信息。
数据格式压缩: 序列化后的数据通常比原始数据更紧凑,从而减少了存储空间的需求。这在大规模数据存储和处理中尤为重要,可以降低成本和提高效率。
持久化存储: 序列化允许对象在不同时间点之间进行持久化存储。通过将对象序列化为文件或数据库中的字节流,可以在需要时重新加载对象的状态。
数据传递: 在分布式计算环境中,需要将数据从一个节点传递到另一个节点。序列化可以将数据转换为字节流,使其易于传递,并在接收方进行反序列化以还原数据。
尽管序列化提供了这些优点,但它也有一些缺点。其中一个主要的缺点是序列化和反序列化过程可能会消耗大量的 CPU 资源,特别是在处理大量数据时。因此,在设计系统时需要权衡这些方面的因素,以确保达到最佳性能。
关联 Kafka
从Kafka 获取数据是大数据实时处理中的常见需求。Apache Kafka 是一个分布式流媒体平台,可以用来发布和订阅流数据。在 Spark 中有两种主要方式可以从 Kafka 获取数据:
基于 Receiver 的方式
这种方式使用了一个叫做 Receiver 的组件来获取数据。Receiver 基于 Kafka 的高层次 Consumer API 实现,即传统的 Kafka Consumer。Receiver 接收数据并将其存储在 Spark 的内存中(在 Executor 中),然后 Spark Streaming 启动的 job 将处理这些数据。这种方式的缺点是它需要占用 Spark 的 Executor 资源来接收数据,而且在某些情况下可能会丢失数据(例如,当 Receiver 所在的节点失败时)。
基于 Direct 的方式
这种方式不使用 Receiver,而是直接连接到 Kafka,有时也称为 Direct Kafka Approach 或 Direct API。在 Spark 1.3 版本引入,它允许 Spark Streaming 创建一组定期的批次作业,并直接从 Kafka 中查询每个批次的数据。每个定期作业定义了从 Kafka 消费的范围(offset 范围)。任务启动时,Spark 使用 Kafka 的简单 Consumer API 从指定效果的数据开始消费。这种方法的好处是减少了资源占用,提高了吞吐量,并且通过精确地控制消费的 offset,可以确保数据绝不丢失(至少一次传递保证)。
在实施上述两种方式时,都需要使用 Spark Streaming Kafka 集成包,并在编写 Spark 应用程序时进行适当配置。基于 Direct 的方式是当前实时处理中更加推荐的方式,因为它提供了更好的性能和容错性。
Spark on Yarn Cluster 模式下 ApplicationMaster 和 Driver 的作用
是的,在 Spark on YARN 的 Cluster 模式下,Spark 的 Driver 组件运行在 ApplicationMaster 的进程中。在这种模式下,ApplicationMaster 的主要职责是与 YARN 的 ResourceManager 进行通信以协商和申请资源,启动和监控 Executor 的运行状态,以及处理由 ResourceManager 分配的容器资源。
更具体来说,ApplicationMaster 的核心工作包括:
资源协商: ApplicationMaster 首先向 ResourceManager 请求资源,这些资源是根据 Spark 作业来决定,如 Executor 的大小和数量。
任务调度与监控: 一旦 ResourceManager 分配了资源,ApplicationMaster 会启动容器并在其中运行 Spark Executor。在作业执行期间,ApplicationMaster 也会监控所需资源的实时情况,并可根据需要请求更多资源或释放不再需要的资源。
Driver 角色: 在 Cluster 模式下,Spark 的 Driver 与 ApplicationMaster 的进程合并。Driver 是负责任务的执行和任务结果的聚合,并将用户程序转换为实际的任务,这些任务会被分派到不同的 Executor 进行执行。Driver 还负责重试失败的任务和执行诸如任务调度和结果收集等管理工作。
选择 Cluster 模式的一个重要理由是,它允许 Driver 在 YARN 集群内部运行,这将为 Driver 提供更好的隔离性,并利用 YARN 提供的容错和快速恢复功能。这与 Client 模式相反,在 Client 模式下,Driver 运行在用户的客户端机器上,与集群外部通信,管理作业的生命周期。
综上,Spark on YARN 的 Cluster 模式通过整合 ApplicationMaster 和 Driver 进程,简化了资源管理,增强了系统的稳健性,并提升了管理的效率。
Spark on Yarn 中 Container 的类型
在 Apache YARN(Yet Another Resource Negotiator)上运行的应用程序大致会使用两种类型的 Container,每种 Container 都有其特定的职责和用途:
ApplicationMaster Container: 当运行一个应用程序时,YARN的ResourceManager首先启动一个ApplicationMaster Container。这个Container是该应用程序的第一个并且唯一的Container,其作用是为应用程序的其他部分协商资源(如内存、CPU核数)。ApplicationMaster是应用程序的主控制点,它负责申请所有必要的资源,并监督应用程序的执行进度。用户在提交应用程序时可以指定ApplicationMaster所需要的资源大小、位置等信息。
任务(Executor)Container: 这些Container是应用程序的工作单元,由ApplicationMaster从ResourceManager那里申请得到,并由NodeManager在各个节点上启动。它们负责执行具体的计算和存储任务,如Spark中的Executor运行在这些Container中。ApplicationMaster会根据计算需求和资源的可用性来动态申请任务Container,它们可以在任意时刻被启动或停止。
在运行Spark作业的上下文中,ApplicationMaster负责启动必要数量的Executor Container,并保证作业顺利运行。各Executor Container将执行Spark作业中的不同任务,并在处理完成之后将结果返回给Driver程序(在Cluster模式下是运行在ApplicationMaster Container中的)。YARN负责在集群中的节点上调度和管理这些Container,包括监控它们的生命周期、重新调度在失败节点上的任务等。这样的设计允许高效和灵活地管理集群资源,提供了良好的隔离性,并支持大规模数据处理应用的需要。
Spark 2.0 为什么放弃了 Akka 而用 Netty
在 Spark 2.0 中放弃 Akka 而采用 Netty 的决策是出于几个考虑:
- 版本兼容性问题: 在 Spark 1. X 版本中,由于 Akka 不同版本之间无法互相通信,用户必须使用与 Spark 完全一样的 Akka 版本。这给用户带来了版本兼容性的困扰。采用 Netty 可以避免这个问题,因为 Netty 具有更好的版本兼容性。
- 配置冲突问题: Spark 的 Akka 配置是为了调优 Spark 自身而设计的,但用户可能在自己的代码中也使用了 Akka,并配置了特定的参数。这可能导致 Spark 的 Akka 配置与用户代码中的 Akka 配置冲突。采用 Netty 可以避免这种冲突,因为 Spark 和用户代码使用了不同的网络通信框架。
- 特性使用和定制: Spark 在使用 Akka 时,并未充分利用 Akka 提供的全部特性,而只使用了其中的一小部分。这些特性相对较少,而且很容易用其他方式实现。采用 Netty 可以让 Spark 更加轻量化,去除不必要的依赖,简化代码结构,并降低维护的复杂性。
- 灵活性和维护性: 由于 Spark 对 Akka 的使用量较少,放弃 Akka 可以提高代码的灵活性和维护性。采用 Netty 后,Spark 可以更自由地进行网络层的调整和优化,同时更容易定位和修复潜在的网络相关的问题。
总的来说,采用 Netty 取代 Akka 是出于简化和优化的考虑,使得 Spark 在网络通信方面更加灵活、稳定,并减轻了用户的版本管理负担。
Yarn 中的 Container 是由谁负责销毁的,在 Hadoop Mapreduce 中 Container 可以复用么?
在 YARN 中,Container 的销毁由 ApplicationMaster 负责通知 Node Manager 进行。Hadoop MapReduce 作业中的 Containers 不支持复用,任务完成后容器就被销毁。而在 Spark on YARN 中,Containers 以 Executors 的形式可以被复用,执行多个任务,提高资源利用率。
在没有启动 Spark 的 Master 和 Worker 服务的情况下,是否仍然能够运行 Spark 应用程序?
可以。Spark 程序能够在不启动 Spark 自身集群管理服务(Master 和 Workers)的情况下运行,因为它可以在其他资源管理器的管理下运行,如 YARN 或者 Mesos。在使用 YARN 时,资源管理由 YARN 的 ResourceManager 和 NodeManager 负责,Spark 仅运行在 YARN 分配的容器内部的 Executors 中。因此,只要 Spark 程序能在 JVM 上运行,并且集群中有支持的资源管理器,就可以执行 Spark 作业。