数据仓库
数据仓库
数据仓库分层
数据仓库的分层是一种重要的架构策略,主要基于以下考虑:
提高处理效率:
分层设计允许数据进行预处理,各个层次可以根据需要对数据进行汇总、聚合或转换,为上层应用提供经过优化的数据。这种方法通过牺牲存储空间(存储预处理和汇总后的数据)来换取处理时间,从而提高查询的响应速度,提升用户体验。减少对原始源的依赖:
数据仓库不仅仅是一个数据存储点,而是一个独立的、专为分析设计的系统。通过在数据仓库中保留数据的历史快照,即使源系统发生更改,分析能力也不会受到影响。同时,如果业务规则发生变化,只需要更新受影响的层次,而无需重新处理整个数据仓库的数据。简化数据管理:
将数据处理过程分解到不同的层次,可以简化数据管理。每一层负责特定的数据处理任务,使得数据流清晰易懂。当数据出现问题时,可以迅速定位到具体的层次和处理步骤以进行调整。
通常数据仓库分层至少包括以下层次:
- 数据提取层(Staging Area):在这一层对数据进行清洗、转换,以准备加载到下一层。
- 基础数据层(Data Foundation/Baseline Layer):存储历史数据和细节数据,通常保存原始的粒度。
- 聚合数据层/汇总层(Aggregation Layer/Summary Layer):根据分析需求对数据进行汇总和聚合,以提高查询效率。
- 应用层/数据骰子层(Application Layer/Data Mart Layer):按照特定主题或业务需求组织数据,面向特定的用户群体或业务过程。
- 接入层(Access Layer/Presentation Layer):用户接入层,提供查询和报告服务。
每个层都解决特定的数据转换和处理需求,使得整个数据仓库设计灵活而且可维护,对于长期的维护和发展至关重要。
ODS 层
- 保持数据原貌不做任何修改,起到备份数据的作用。
- 数据采用压缩,减少磁盘存储空间(例如:原始数据100G,可以压缩到10G 左右)
- 创建分区表,防止后续的全表扫描
- 压缩采用 Snappy,存储采用 orc
DWD 层
在数据仓库中,DWD(Data Warehouse Detail)层是负责处理细节数据的层次,主要任务是从业务系统的数据源中抽取、清洗和加工数据,以满足数仓模型的需求。以下是在 DWD 层进行维度建模和数据清洗时的一些建议:
数据清洗:
数据清洗是 DWD 层的关键任务之一,确保数据的完整性和准确性。清洗过程包括以下几个方面:
- 数据抽取:
从业务系统中抽取原始数据,通常使用 ETL 工具或自定义脚本进行抽取,以确保数据的完整性。
- 数据清洗:
- 空值去除: 去除包含空值的记录,确保数据的完整性和质量。
- 过滤无意义数据: 去除核心字段无意义的数据,例如订单表中订单 ID 为 null,支付表中支付 ID 为空的记录。
- 脱敏: 对敏感数据如手机号、身份证号进行脱敏处理,保护用户隐私。
- 维度退化和降维: 对业务数据传过来的表进行维度的退化和降维,以适应数据仓库的模型。
- 数据一致性处理:
将用户行为宽表和业务表进行数据一致性处理,确保数据在不同业务系统中的一致性。
- 清洗手段:
- 使用 SQL 进行数据清洗,通过编写 SQL 语句过滤、清除不符合条件的数据。
- 使用 ETL 工具(如 Kettle)进行数据清洗,通过可视化的方式设计清洗流程。
- 使用编程语言如 Python 进行清洗,通过自定义脚本实现清洗逻辑。
- MapReduce 或 RDD 编程模型也可以用于大规模数据的清洗和处理。
维度建模
DWD层需构建维度模型,一般采用星型模型,呈现的状态一般为星座模型。
维度建模一般按照以下四个步骤:
选择业务过程→声明粒度→确认维度→确认事实
- 选择业务过程:
在 DWD 层维度建模的第一步是选择业务过程,通常每个业务过程对应一张事实表。例如,可以选择下单业务、支付业务、退款业务、物流业务等,每条业务线对应一张事实表。如果业务较为简单,也可以考虑选择所有业务线。
- 声明粒度:
声明粒度是非常重要的一步,它定义了事实表中的一行数据代表什么。建议尽可能选择最小粒度,以适应不同的需求。例如:
- 对于订单,可以选择每个商品项作为下单事实表中的一行,粒度为每次下单。
- 对于统计订单次数,可以选择每周的订单次数作为一行,粒度为每周。
选择最小粒度的好处是可以应对各种需求,避免后续无法统计细粒度指标的问题。
- 确认维度:
维度主要描述了业务事实的“谁、何处、何时”等信息。常见的维度包括时间维度、用户维度、地区维度等。在确认维度时,需要根据业务的特点选择合适的维度,确保维度能够提供充分的上下文信息。
- 确认事实:
在 DWD 层,事实通常指业务中的度量值,如订单金额、下单次数等。根据业务过程的特点,构建最细粒度的明细层事实表。同时,可以考虑对事实表进行适当的宽表化处理,以便更好地支持后续的查询和分析。
通过综合运用以上建议,可以在 DWD 层构建出符合维度建模要求的数据结构,并对原始数据进行有效的清洗,为后续数据处理和分析提供高质量、一致性的数据基础。
DWS 层
DWS层统计各个主题对象的当天行为,服务于DWT层的主题宽表。如图所示,DWS层的宽表字段,是站在不同维度的视角去看事实表,重点关注事实表的度量值,通过与之关联的事实表,获得不同的事实表的度量值。
在数据仓库中,DWS(Data Warehouse Summary)层是数据仓库的汇总层,主要负责对清洗和加工后的数据进行汇总和聚合,以生成满足业务需求的宽表。以下是 DWS 层常见的工作内容:
宽表设计:
- 设计包含 3-5 张宽表,每张宽表负责处理 100-200 个指标,覆盖业务中 70% 以上的需求。
- 具体宽表的设计可以根据业务需求,例如用户行为宽表、用户购买商品明细行为宽表、商品宽表、购物车宽表、物流宽表、登录注册宽表、售后宽表等。
字段数量和宽表宽度:
- 最宽的宽表通常是用户行为宽表,大概包含 60-100 个字段。这些字段涵盖了用户在平台上的各种行为和交互信息。
- 其他宽表的字段数量根据业务需求而定,一般包括多个维度和指标,以支持复杂的数据分析和报表生成。
数据汇总和聚合:
- 对清洗和加工后的数据进行汇总和聚合,生成宽表的数据。
- 使用 SQL 或类似的数据处理语言进行数据的聚合操作,例如对销售数据进行按天、按周、按月的统计。
数据存储和压缩:
- 选择适当的存储格式,例如 Parquet 或 ORC,以优化查询性能。
- 使用压缩算法,如 Snappy,减小存储空间占用,提高查询效率。
性能调优:
- 对宽表的查询性能进行调优,可以通过合理的索引、分区等手段提高查询效率。
- 考虑数据分桶等策略,以加速查询速度。
业务指标计算:
- 根据宽表的设计,计算各种业务指标,包括但不限于销售额、订单量、用户活跃度等。
总体而言,DWS 层的工作是将清洗、加工后的数据以宽表的形式提供给业务分析和报表系统,以支持业务决策和数据挖掘。
事实表的类型
事实表是数据仓库中的核心表,记录了业务过程中的度量和指标。根据数据的特性和业务需求,事实表可以分为不同类型,主要有以下几类:
事务事实表(Transactional Fact Table):
- 以事务为单位记录业务事件,保存最原子的数据。
- 数据粒度通常是每个事务记录一条,适用于记录业务中的具体事务事件。
- 例如,订单明细表是一个事务事实表,每一行记录代表一个订单中的商品购买事务。
周期快照事实表(Periodic Snapshot Fact Table):
- 以规律性、可预测的时间间隔记录事实。
- 统计的是每个时间段内的度量统计,每个时间段对应一条记录。
- 适用于周期性统计和报告,例如,每月的销售总额、每周的库存统计。
累积快照事实表(Accumulating Snapshot Fact Table):
- 记录不确定的周期内的数据,通常覆盖事务或产品的生命周期。
- 包含多个日期字段,记录关键时间点的度量值,用于跟踪整个生命周期的变化。
- 适用于需要追踪整个生命周期中关键阶段的指标,例如,跟踪客户在销售漏斗中的不同阶段。
非事实型事实表(Non-additive Fact Table):
- 不包含可加性的度量事实,通常只有多个维度外键,用于跟踪事件或说明某些活动的范围。
- 分为两类:
- 事件型事实表:用于跟踪事件,每行表示一个事件的发生。
- 范围型事实表:用于说明某些活动的范围,每行表示一个活动范围的情况。
- 适用于记录业务中不可加性的信息,例如,学生注册事件、促销范围。
每种类型的事实表都有其特定的应用场景,根据业务需求选择合适的事实表类型有助于构建更有效的数据仓库。
数据建模常见模型
1. 星型模型
- 最基本的数据仓库模型,包含一个或多个事实表和维度表。
- 事实表:位于模型中心,存储量化的业务数据,包含了所谓的“事实”(比如销售额、利润等),以及维度表的外键。
- 维度表:围绕事实表,描述事实表中度量的上下文信息,如时间、地点、产品、人员等。
- 星型模型主要优点是查询性能良好,易于理解和管理。
- 维表只和事实表关联,维表之间没有关联;
- 每个维表主键为单列,且该主键放置在事实表中,作为两边连接的外键;
- 以事实表为核心,维表围绕核心呈星形分布。
2. 雪花模型
雪花模式 (Snowflake Schema)是对星形模式的扩展。雪花模式的维度表可以拥有其他维度表的,虽然这种模型相比星型更规范一些,但是由于这种模型不太容易理解,维护成本比较高,而且性能方面需要关联多层维表,性能比星型模型要低。
- 星型模型的变种,与星型模型不同,维度表可以进一步归一化分解为更小的维度表。
- 这种模型通过归一化减少了数据冗余,并可能提高了某些查询的效率。
- 但是复杂度较高,维护成本增加,且可能会因为增加的表连接而降低查询性能。
3. 星座模型
星座模式是星型模式延伸而来,星型模式是基于一张事实表的,而星座模式是基于多张事实表的,而且共享维度信息。前面介绍的两种维度建模方法都是多维表对应单事实表,但在很多时候维度空间内的事实表不止一个,而一个维表也可能被多个事实表用到。在业务发展后期,绝大部分维度建模都采用的是星座模式。
- 星座模型是包括多个星型模型或雪花模型的更复杂的结构。
- 它支持多个事实表,这些事实表可能共享一些维度表。
- 星座模型使得能够在一个数据仓库里分析不同的业务过程。
在实际应用中,数据仓库设计可能会根据特定业务需求和查询模式的不同选用不同的数据模型。星型模型由于其简单和直观,便于理解,是最常见的选择,特别是在性能和易用性是主要关注点时。雪花模型则可能在某些需要高度归一化数据的场景中更为合适,尽管可能会牺牲一些性能和简易性。
数据建模不仅限于这些传统模型,随着技术发展,例如非关系型数据库和大数据技术的应用,也衍生了诸如文档模型、列存储模型等新型数据模型,这些模型为非结构化或半结构化数据的存储和处理提供了更为灵活的解决方案。
数据漂移解决方案
1. 什么是数据漂移?
数据漂移通常指 ODS(Operational Data Store)表中同一业务日期的数据中包含了前一天或后一天凌晨附近的数据,或者丢失了当天发生的变更数据。这在大部分公司中是一个常见的场景。
2. 如何解决数据漂移问题?
通常有两种解决方案:
a. 多获取后一天的数据,保障数据只多不少:
- 通过获取后一天的数据,确保不会遗漏当天的数据变更。这是一种较为直接的解决方案。
b. 通过多个时间戳字段来限制时间获取相对准确的数据: - 这种方案通过多个时间戳字段来解决数据漂移问题,具体的时间戳字段包括:
- 数据库表中用来标识数据记录更新时间的时间戳字段(假设为 modified_time)。
- 数据库日志中用来标识数据记录更新时间的时间戳字段(假设为 log_time)。
- 数据库表中用来记录具体业务过程发生时间的时间戳字段(假设为 proc_time)。
- 标识数据记录被抽取到时间的时间戳字段(假设为 extract_time)。
数据漂移解决方案步骤:
- 根据 extract_time 进行同步,保障数据不会丢失。
- 根据 modified_time 进行限制同步,确保只获取当天的数据,但存在 modified_time 未更新导致数据遗漏的问题。
- 根据 proc_time 进行限制,但这可能违背 ODS 和业务库保持一致的原则。
细化解决方案:
- 通过 log_time 多同步前一天最后 15 分钟和后一天凌晨开始 15 分钟的数据,确保涵盖可能的数据变更。
- 根据 modified_time 过滤掉非当天的数据,以避免数据的遗漏。
- 根据 log_time 获取后一天 15 分钟的数据,按照主键根据 log_time 做升序排序,以确保获取最接近当天的数据记录变化。
- 最后,将前两步的数据进行全外连接,通过限制业务时间 proc_time 来获取最终所需的数据。
这种方案相对于直接获取后一天的数据,更加细致和灵活,可以较好地应对数据漂移的情况。
维度建模和范式建模的区别
1. 范式建模(3 NF 模型)特点:
- 原⼦性: 数据不可分割。
- 完全依赖: 实体属性完全依赖于主键,不能存在仅依赖主关键字⼀部分属性,即不能存在部分依赖。
- 消除传递依赖: 任何⾮主属性不依赖于其他⾮主属性,即消除传递依赖。
- 降低冗余: 3 NF 的最终⽬的是为了降低数据冗余,保障数据⼀致性。
2. 维度建模特点:
- ⾯向分析场景: 主要关注点在于快速、灵活,能够提供⼤规模的数据响应。
- 星型模型: 由⼀个事实表和⼀组维度表组成,每个维表都有⼀个维度作为主键。事实表居中,多个维表呈辐射状分布在四周,并与事实表关联,形成⼀个星型结构。
- 雪花模型: 在星型模型的基础上,基于范式理论进⼀步层次化,将某些维表扩展成事实表,最终形成雪花状结构。
- 星系模型: 基于多个事实表,共享⼀些维度表。
主要区别:
- 目标不同: 范式建模主要目标是消除冗余、保障数据一致性;而维度建模主要目标是为了适应分析场景,提供快速、灵活的数据响应。
- 结构不同: 范式建模强调规范化,表结构符合 3 NF 规范;维度建模强调星型、雪花型等结构,更注重满足分析查询的需要。
- 应用场景不同: 范式建模更适用于 OLTP(在线事务处理)场景,而维度建模更适用于 OLAP(在线分析处理)场景。
综合比较:
范式建模更适合于需要保持数据一致性、降低冗余的事务型数据库设计,而维度建模更适合于面向分析、报表和数据仓库等决策支持系统的设计。选择建模方法应根据具体业务需求和应用场景来决定。
数据倾斜
数据倾斜的表现
Hadoop 中的数据倾斜表现:
- Reduce 卡住: 一个或多个 Reduce 任务卡在 99.99%,一直无法完成。
- Container 报错 OOM: 各种容器报告 Out of Memory(OOM)错误,表明内存不足。
- 异常的 Reducer 数据量过大: 异常的 Reducer 读写的数据量异常巨大,远远超过其他正常的 Reducer。
- 任务异常表现: 伴随着数据倾斜,任务可能会因为各种原因被终止,例如被强制终止(kill)等,呈现出各种诡异的表现。
Hive 中的数据倾斜表现:
- 发生在 GROUP BY 和 JOIN ON 上: 通常发生在 SQL 查询中的 GROUP BY 和 JOIN ON 操作上,与数据逻辑深度绑定。
Spark 中的数据倾斜表现(包括 Spark Streaming 和 Spark SQL):
- Executor 丢失、OOM、Shuffle 过程错误: Executor 可能丢失,出现 Out of Memory 错误,或者 Shuffle 过程出现错误。
- Driver OOM: Driver 端可能会因为数据倾斜导致 Out of Memory 错误。
- 单个 Executor 执行时间异常长: 一个 Executor 的执行时间明显长于其他正常 Executor,导致整体任务无法顺利完成。
- 任务突然失败: 正常运行的任务突然失败,可能由于数据倾斜导致的异常情况。
数据倾斜是大数据处理中常见的挑战,可能导致任务性能下降、资源浪费、任务失败等问题。在面对数据倾斜时,需要采取一系列的优化和调优手段,例如使用合适的分区策略、倾斜数据的均衡处理、使用合适的数据结构和算法等,以尽可能减轻数据倾斜带来的影响。
数据倾斜产生原因
我们以 Spark 和 Hive 的使用场景为例。
他们在做数据运算的时候会涉及到,count distinct、group by、join on等操作,这些都会触发Shuffle动作。一旦触发Shuffle,所有相同key的值就会被拉到一个或几个Reducer节点上,容易发生单点计算问题,导致数据倾斜。
一般来说,数据倾斜原因有以下几方面:
Key 分布不均匀
建表时考虑不周
我们举一个例子,就说数据默认值的设计吧,假设我们有两张表:
user(用户信息表):userid,register_ip
ip(IP 表):ip,register_user_cnt
这可能是两个不同的人开发的数据表。如果我们的数据规范不太完善的话,会出现一种情况:
user 表中的 register_ip 字段,如果获取不到这个信息,我们默认为 null;
但是在 ip 表中,我们在统计这个值的时候,为了方便,我们把获取不到 ip 的用户,统一认为他们的 ip 为0。
两边其实都没有错的,但是一旦我们做关联了,这个任务会在做关联的阶段,也就是sql的on的阶段卡死。
业务数据激增
比如订单场景,我们在某一天在北京和上海两个城市多了强力的推广,结果可能是这两个城市的订单量增长了10000%,其余城市的数据量不变。
然后我们要统计不同城市的订单情况,这样,一做group操作,可能直接就数据倾斜了。
数据倾斜解决思路
很多数据倾斜的问题,都可以用和平台无关的方式解决,比如更好的数据预处理,异常值的过滤等。因此,解决数据倾斜的重点在于对数据设计和业务的理解,这两个搞清楚了,数据倾斜就解决了大部分了。
- 业务逻辑
我们从业务逻辑的层面上来优化数据倾斜,比如上面的两个城市做推广活动导致那两个城市数据量激增的例子,我们可以单独对这两个城市来做count,单独做时可用两次MR,第一次打散计算,第二次再最终聚合计算。完成后和其它城市做整合。
- 程序层面
比如说在 Hive 中,经常遇到 count(distinct)操作,这样会导致最终只有一个 Reduce 任务。
我们可以先 group by,再在外面包一层 count,就可以了。比如计算按用户名去重后的总用户量:
(1)优化前只有一个 reduce,先去重再 count 负担比较大:
1 |
|
(2)优化后
1 |
|
- 调参方面
Hadoop和Spark都自带了很多的参数和机制来调节数据倾斜,合理利用它们就能解决大部分问题。
- 从业务和数据上解决数据倾斜
很多数据倾斜都是在数据的使用上造成的。我们举几个场景,并分别给出它们的解决方案。
- 有损的方法:找到异常数据,比如ip为0的数据,过滤掉
- 无损的方法:对分布不均匀的数据,单独计算
- 先对key做一层hash,先将数据随机打散让它的并行度变大,再汇集
- 数据预处理
定位数据倾斜代码
Spark 数据倾斜只会发生在 shuffle 过程中。
这里给大家罗列一些常用的并且可能会触发shuffle操作的算子:distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition等。
出现数据倾斜时,可能就是你的代码中使用了这些算子中的某一个所导致的
某个 task 执行特别慢的情况
当在大数据处理中某个 Task 执行异常缓慢时,可能会出现以下情况,需要进行分析和调优:
- 数据倾斜表现: 通过 Spark Web UI 或本地日志,检查任务执行到的 Stage。数据倾斜可能导致部分任务处理的数据量远远超过其他任务,表现为部分任务运行缓慢,而其他任务正常。
- Task 运行时间和数据量: 观察每个 Task 的运行时间和处理数据量。异常缓慢的 Task 通常会显示明显的运行时间差异和处理数据量异常。
- 任务执行阶段: 根据 Stage 划分原理,确定发生数据倾斜的 Stage。在 Stage 内部,通过观察各个 Task 分配的数据量,可以进一步确定是否存在数据倾斜。
- 推断数据倾斜代码: 查看 Stage 内部的代码,特别是是否存在 Shuffle 操作。数据倾斜通常与 Shuffle 相关。通过查看 Spark 代码或 SQL 语句,划分 Stage 的算子即可确定是否存在 Shuffle。
- 定位问题算子: 在确认 Stage 和 Shuffle 后,定位问题算子。例如,在 Spark SQL 中,GROUP BY 和 JOIN 操作常常是 Shuffle 操作,可能导致数据倾斜。在代码中定位 Shuffle 操作,有助于找到具体的问题算子。
- 处理数据倾斜: 采取相应的数据倾斜处理措施,如使用合适的分区策略、使用广播变量优化、采用均匀分布的方式处理数据等,以减轻数据倾斜的影响。
- 监控 Executor 和 Driver: 观察 Executor 是否丢失、OOM 是否发生,以及 Driver 是否出现 OOM。这些异常可能与数据倾斜有关,通过监控工具定位具体原因。
- 检查 Task 失败日志: 查看异常缓慢的 Task 的详细日志,检查是否存在错误、异常或超时等问题,以便更好地定位问题。
以上步骤有助于在发生 Task 异常缓慢时迅速定位问题,并采取相应的优化和调优策略,以提高整体任务的性能和稳定性。
某个 task 莫名其妙内存溢出的情况
当某个 Task 发生内存溢出时,建议采用以下步骤进行问题定位:
查看异常栈信息:
- 在 yarn-client 模式下,查看本地日志的异常栈信息。
- 在 yarn-cluster 模式下,通过 YARN 查看对应 Application 的日志,找到异常栈信息。
定位异常发生的代码行:
- 根据异常栈信息,定位到具体的代码行。通常,内存溢出会发生在某个算子的执行过程中。
检查是否存在 Shuffle 操作:
- 在定位到的代码行附近,检查是否存在 Shuffle 操作,例如,reduceByKey、groupByKey 等。这些操作可能是导致数据倾斜的关键。
使用采样数据进行分析:
- 对 pairs 进行采样,获取一个样本数据集。
- 使用 countByKey 算子统计样本数据中每个 key 出现的次数。
- 遍历和打印样本数据中各个 key 的出现次数。
1 |
|
观察 Task 运行时间和数据分配:
- 通过 Spark Web UI 查看异常 Task 所在 Stage 的运行时间和数据分配情况。异常 Task 的运行时间和数据分配可能与正常 Task 明显不同。
检查整体任务情况:
- 观察整体任务的运行情况,检查是否有 Executor 丢失、Driver OOM 等异常。这些异常也可能与数据倾斜有关。
采取相应的数据倾斜处理措施:
- 如果确认发生了数据倾斜,采取相应的处理措施,例如优化分区策略、使用广播变量优化、调整均匀分布策略等。
细致检查代码逻辑:
- 对定位到的代码附近进行仔细检查,确保算子逻辑正确,避免因代码 bug 导致的内存溢出。
通过以上步骤,可以帮助更快速地定位和处理某个 Task 内存溢出的情况,同时准确判断是否是由于数据倾斜导致的异常。
数据倾斜解决方案
使用 Hive ETL 预处理数据
适用场景
这种方案适用于 Hive 表中的数据存在明显不均匀分布,导致 Spark 作业中频繁执行 shuffle 算子而产生数据倾斜的情况。如果业务场景允许对 Hive 表进行预处理,比如对数据进行聚合或与其他表进行 join 操作,那么可以考虑使用 Hive ETL 提前处理数据。
实现思路
- 评估数据分布:通过分析 Hive 表中的数据分布情况,判断是否存在严重的数据倾斜,特别是某些 key 对应的数据量远远超过其他 key。
- Hive ETL 预处理:在 Hive 中编写 ETL 任务,对数据进行预处理,可能包括按照 key 进行聚合或与其他表进行 join 操作。这样可以在 Hive 层面避免 Spark 中频繁执行 shuffle 算子。
- Spark 作业处理:在 Spark 作业中,不直接使用原始的 Hive 表,而是使用经过预处理的 Hive 表。由于数据已经提前聚合或处理,避免了 Spark 中的 shuffle 操作,从而减轻了数据倾斜的可能性。
原理
通过在 Hive 中进行预处理,提前对数据进行聚合或 join 操作,避免了在 Spark 作业中执行 shuffle 算子导致的数据倾斜。这样虽然并未彻底解决数据倾斜问题,但至少将数据倾斜的发生时机提前到了 Hive ETL 阶段。
优缺点
优点:
- 实现简便,不需要在 Spark 作业中额外处理数据倾斜。
- 提高 Spark 作业的性能,避免了频繁执行 shuffle 算子导致的性能问题。
缺点:
- 未根本解决数据倾斜问题,仍在 Hive ETL 阶段可能发生数据倾斜。
- Hive ETL 过程可能会变得较慢,特别是在数据倾斜严重的情况下。
实践经验
这种方案在一些需要频繁调用 Spark 作业的场景中表现较好,特别是在要求 Spark 作业执行性能较高的情况下。将数据倾斜的处理提前到 Hive ETL,可以降低 Spark 作业的复杂性,提高执行效率,从而在实际应用中提供更好的用户体验。
过滤少数导致倾斜的 key
适用场景
这种方案适用于发现导致数据倾斜的是极少数的 key,且这些 key 的数据量极大,而对计算结果影响较小的情况。例如,99%的 key 对应的数据量较小,只有极少数的 key 对应了大量的数据,导致了数据倾斜。
实现思路
静态过滤: 如果已知导致数据倾斜的具体 key,可以在 Spark SQL 中使用
WHERE
子句或在 Spark Core 中使用filter
算子直接过滤掉这些 key。1
2-- Spark SQL 中的静态过滤
SELECT * FROM your_table WHERE key NOT IN ('skewed_key1', 'skewed_key2');1
2// Spark Core 中的静态过滤
val filteredRDD = originalRDD.filter(record => record._1 != "skewed_key1" && record._1 != "skewed_key2")动态过滤: 如果需要在每次作业执行时动态确定哪些 key 的数据量较大,可以使用
sample
算子对 RDD 进行采样,然后计算每个 key 的数量,最后选择数据量最大的几个 key 进行过滤。1
2
3
4
5// Spark Core 中的动态过滤
val sampledData = originalRDD.sample(false, 0.1) // 采样 10% 的数据
val keyCounts = sampledData.countByKey()
val skewedKeys = keyCounts.filter { case (key, count) => count > threshold }.keys.toList
val filteredRDD = originalRDD.filter { case (key, _) => !skewedKeys.contains(key) }
原理
通过过滤掉导致数据倾斜的极少数 key,这些 key 不再参与后续的计算,从而避免了数据倾斜对计算造成的影响。
优缺点
优点:
- 实现简单,易于理解和部署。
- 在特定场景下,对计算结果的影响较小,可以完全规避数据倾斜。
缺点:
- 适用场景有限,只适用于导致数据倾斜的 key 极少且对计算影响不大的情况。
实践经验
在实际项目中,我们曾遇到某个 key 在某一天的数据异常,导致了 Spark 作业运行时的内存溢出。通过采样计算数据量最大的几个 key,然后动态过滤掉这些 key,成功规避了数据倾斜对作业的影响,保证了作业的正常执行。此方法在静态和动态两种过滤方式中都可根据实际需求选择。
提高 Shuffle 操作的并行度
适用场景
这种方案适用于必须处理数据倾斜的情况,是处理数据倾斜最直接、简单的一种方式。
实现思路
在对 RDD 执行 Shuffle 算子时,可以通过为 Shuffle 算子传递参数的方式设置并行度,例如 reduceByKey(1000)
。这里的参数即为 Spark 配置项 spark.sql.shuffle.partitions
,它表示 Shuffle 操作的并行度,默认值为 200。增加这个参数的值,即增加 Shuffle read task 的数量,可以使原本分配给单个 task 处理的数据分散到多个 task 上,从而降低每个 task 处理的数据量,缓解数据倾斜。
原理
通过增加 Shuffle read task 的数量,实现了对原本分配给单个 task 处理的数据进行分散,每个 task 处理更少的数据,从而减轻了数据倾斜的问题。具体原理如下图所示:
优缺点
优点:
- 实现简单,易于理解和部署。
- 在很多场景下都能够有效缓解和减轻数据倾斜的影响。
缺点:
- 无法彻底解决数据倾斜,特别是当某个 key 的数据量极大时,无论并行度设置多大,该 key 仍然会分配到一个 task 处理,导致数据倾斜。
- 效果在某些极端情况下有限。
实践经验
这种方案通常用作发现数据倾斜时的第一手段,尝试通过简单的方式缓解数据倾斜。在实际项目中,由于无法解决极端情况下的数据倾斜,往往需要结合其他方案使用,综合考虑各种情况。
两阶段聚合(局部聚合+全局聚合)
适用场景
这种方案适用于对 RDD 执行聚合类 shuffle 算子,或者在 Spark SQL 中使用 group by 语句进行分组聚合的情况。特别在数据倾斜导致的聚合类操作中,效果显著。
实现思路
局部聚合: 给每个 key 打上一个随机前缀,例如在 10 以内的随机数。原先相同的 key 会变成不同的,例如 (hello, 1) (hello, 1) (hello, 1) (hello, 1) 可以变成 (1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。
局部聚合操作: 对打上随机前缀的数据执行 reduceByKey 等聚合操作,完成局部聚合。得到局部聚合结果,例如 (1_hello, 2) (2_hello, 2)。
去除随机前缀: 去除 RDD 中每个 key 的随机前缀,将数据还原为原始状态,例如 (hello, 2)(hello,2)。
全局聚合操作: 对去除了随机前缀的 RDD 进行全局聚合操作,得到最终结果,例如 (hello, 4)。
示例代码如下:
1 |
|
原理
通过附加随机前缀,将原本相同的 key 变成多个不同的 key,实现了将原本被一个 task 处理的数据分散到多个 task 上做局部聚合,进而解决单个 task 处理数据量过多的问题。去除随机前缀后,再次进行全局聚合,得到最终结果。具体原理见下图:
优缺点
优点:
- 对于聚合类的 shuffle 操作导致的数据倾斜,效果显著。通常可以解决或大幅度缓解数据倾斜,提升 Spark 作业性能数倍以上。
缺点:
- 仅适用于聚合类的 shuffle 操作,对于 join 类的 shuffle 操作需要使用其他解决方案。
实践经验
在处理聚合类 shuffle 操作导致的数据倾斜
时,两阶段聚合是一种非常有效的方案。然而,对于不同的场景和数据分布,需要根据实际情况选择合适的解决方案,有时也需要结合其他方法来进一步提高处理效果。
将 Reduce Join 转为 Map Join
适用场景
在对RDD使用join类操作,或者是在Spark SQL中使用join语句时,而且join操作中的一个RDD或表的数据量比较小(比如几百M或者一两G),比较适用此方案。
实现思路
不使用 join 算子进行连接操作,而使用 Broadcast 变量与 map 类算子实现 join 操作,进而完全规避掉 shuffle 类的操作,彻底避免数据倾斜的发生和出现。将较小 RDD 中的数据直接通过 collect 算子拉取到 Driver 端的内存中来,然后对其创建一个 Broadcast 变量,广播给其他 Executor 节点;
接着对另外一个RDD执行map类算子,在算子函数内,从Broadcast变量中获取较小RDD的全量数据,与当前RDD的每一条数据按照连接key进行比对,如果连接key相同的话,那么就将两个RDD的数据用你需要的方式连接起来。
示例如下:
1 |
|
原理
普通的 join 是会走 shuffle 过程的,而一旦 shuffle,就相当于会将相同 key 的数据拉取到一个 shuffle read task 中再进行 join,此时就是 reduce join。
但是如果一个 RDD 是比较小的,则可以采用广播小 RDD 全量数据+map 算子来实现与 join 同样的效果,也就是 map join,此时就不会发生 shuffle 操作,也就不会发生数据倾斜。具体原理如下图所示。
优缺点
优点:对 join 操作导致的数据倾斜,效果非常好,因为根本就不会发生 shuffle,也就根本不会发生数据倾斜。
缺点:适用场景较少,因为这个方案只适用于一个大表和一个小表的情况。毕竟我们需要将小表进行广播,此时会比较消耗内存资源,driver 和每个 Executor 内存中都会驻留一份小 RDD 的全量数据。如果我们广播出去的 RDD 数据比较大,比如10G 以上,那么就可能发生内存溢出了。因此并不适合两个都是大表的情况。
采样倾斜 Key 并分拆 Join 操作
适用场景
在两个 RDD 或 Hive 表进行 join 时,如果数据量巨大且无法采用其他解决方案,而数据倾斜主要由其中某个 RDD 或表中极少数 key 的数据量过大引起,此时可考虑采用该解决方案。
实现思路
- 对数据量巨大的 RDD(例如 rdd1)进行采样,得到一个样本 RDD。
- 统计样本 RDD 中每个 key 的数量,并找出数据量最大的几个 key。
- 将 rdd1 中与这几个 key 相关的数据分拆出来,形成一个独立的 RDD,并为每个 key 加上 0 到 n 之间的随机前缀。
- 将 rdd1 中与这几个 key 无关的普通 key 形成另一个 RDD。
- 将需要进行 join 的另一个 RDD(例如 rdd2)中与这几个 key 相关的数据也分拆出来,并每条数据膨胀成 n 条,每条数据都附加一个 0 到 n 的前缀。
- 将膨胀后的 rdd2 与附加随机前缀的独立 RDD 进行 join,此时原本相同的 key 被打散成 n 份,分散到多个 task 中进行 join。
- 对 rdd1 中普通 key 与 rdd2 进行正常的 join 操作。
- 将两次 join 的结果使用 union 合并。
示例代码如下:
1 |
|
原理
对于导致数据倾斜的极少数 key,通过采样得到样本数据,找出数据量最大的几个 key。然后将这几个 key 对应的数据从原 RDD 中拆分出来,形成一个独立的 RDD,为每个 key 加上 0 到 n 之间的随机前缀。同时,将需要 join 的另一个 RDD 中与这几个 key 相关的数据分拆并膨胀 n 倍,每条数据都附加一个 0 到 n 的前缀。最后,将两个独立的 RDD 进行 join 操作,再与普通 key 的 RDD 进行正常的 join 操作,最终合并结果。
优缺点
优点:
- 对于仅有极少数 key 导致的数据倾斜,采用该方式可以有效地打散 key 进行 join。
- 不需要对全量数据进行扩容,避免了占用过多内存。
缺点:
- 如果导致倾斜的 key 数量较多,则该方式不适用。
使用随机前缀和扩容 RDD 进行 Join
适用场景
在进行 Join 操作时,如果其中一个 RDD 中存在大量的 Key 导致数据倾斜,而其他解决方案无法奏效,可以考虑使用随机前缀和扩容 RDD 进行 Join 的方式。
实现思路
- 将一个 Key 分布相对均匀的 RDD 进行扩容,将每条数据扩容成 n 条,每条数据都附加一个 0 到 n 的前缀。
- 将另一个可能导致数据倾斜的 RDD 中的每条数据都打上一个 n 以内的随机前缀。
- 对两个处理后的 RDD 进行 Join 操作。
示例代码如下:
1 |
|
原理
通过为一个 Key 分布相对均匀的 RDD 进行扩容,将每条数据扩容成 n 条,同时附加一个 0 到 n 的前缀。对另一个可能导致数据倾斜的 RDD 中的每条数据都打上一个 n 以内的随机前缀。最终进行 Join 操作,将原本相同的 Key 打散成 n 份,分散到多个 Task 中进行 Join。
优缺点
优点:
- 对 Join 类型的数据倾斜具有较好的处理效果,能够显著提升性能。
- 相较于其他解决方案,对内存资源的占用相对较小。
缺点:
- 仅能缓解数据倾斜,而无法彻底避免。
- 对整个 RDD 进行扩容,对内存资源要求较高。
实践经验
在一个实际数据需求中,优化前的作业执行时间约为 60 分钟,使用该方案优化后,执行时间缩短到 10 分钟左右,性能提升了 6 倍。
多种方案组合使用
在实际应用中,处理数据倾斜的场景可能会更为复杂,单一的解决方案可能无法完全解决问题。因此,可以考虑组合多种方案来处理数据倾斜,特别是在处理较为复杂的数据倾斜场景时。
组合方案示例:
HiveETL 预处理和过滤:
- 通过 HiveETL 预处理,对数据进行初步清洗和转换,提前过滤一部分可能导致数据倾斜的键值对。
- 针对少数导致倾斜的 key,可以采用 HiveETL 预处理进行特殊处理,如聚合或拆分。
提高 Shuffle 操作并行度:
- 针对 Shuffle 操作,通过增加并行度的方式,提高任务的并行度,减轻每个任务处理的数据量。
- 通过设置合适的 shuffle 参数,如
spark.sql.shuffle.partitions
,确保适应不同场景的并行度。
局部聚合 + 全局聚合:
- 对于聚合类的 Shuffle 操作,可以采用局部聚合 + 全局聚合的方案,先在局部聚合阶段缓解数据倾斜,再进行全局聚合获得最终结果。
采样倾斜 key 并分拆 Join 操作:
- 针对 Join 操作,通过采样倾斜的 key,识别并分拆出导致数据倾斜的 key。
- 对导致倾斜的 key 进行特殊处理,如使用随机前缀和扩容 RDD 进行 Join。
使用随机前缀和扩容 RDD 进行 Join:
- 针对有大量 key 导致的数据倾斜,使用随机前缀和扩容 RDD 的方式,将数据打散,分散到多个 Task 中进行 Join。
原理:
通过综合利用不同方案,根据具体场景和需求,逐步解决数据倾斜问题。每个方案在特定情况下都有其优势,组合使用可以更灵活地适应不同的数据倾斜情况。
优劣势:
优势:
- 针对多个环节的数据倾斜,可以有针对性地采用不同的方案,提高处理的效果。
- 灵活性高,适用于各种不同的数据分布和业务场景。
劣势:
- 需要更多的调研和分析,对不同阶段的数据倾斜进行合理选择和组合。
- 组合使用可能带来一定的复杂性,需要谨慎设计和调试。
实践经验:
在实践中,深入理解各种方案的思路和原理后,根据具体情况有选择地组合使用这些方案。通过灵活运用多种方案,可以更好地解决复杂场景下的数据倾斜问题,提高作业性能。
总结:
数据倾斜处理并非一劳永逸,而是需要根据具体场景灵活选择和组合不同的方案。多种方案相互配合,可以更好地解决复杂的数据倾斜挑战。