本文从数据歪斜的危害、现象、要素等方面,由浅入深论述Spark数据歪斜及其处置方案。
一、什么是数据歪斜
对 Spark/Hadoop 这样的散布式大数据系统来讲,数据量大并无法怕,可怕的是数据歪斜。
关于散布式系统而言,现实状况下,随着系统规模(节点数量)的参与,运行全体耗时线性降低。假设一台机器处置一批少量数据须要120分钟,当机器数量参与到3台时,现实的耗时为120/ 3 = 40分钟。然而,想做到散布式状况下每台机器口头期间是单机时的1 /N,就必需保障每台机器的义务量相等。可怜的是,很多时刻,义务的调配是不平均的,甚至不平均到大部分义务被调配到一般机器上,其它大部分机器所调配的义务量只占总得的小部分。比如一台机器担任处置80% 的义务,另外两台机器各处置 10% 的义务。
『不患多而患不均』,这是散布式环境下最大的疑问。象征着计算才干不是线性扩展的,而是存在短板效应: 一个 Stage 所消耗的期间,是由最慢的那个 Task选择。
由于同一个 Stage 内的一切 task 口头相反的计算,在扫除不同计算节点计算才干差异的前提下,不同 task 之间耗时的差异重要由该 task所处置的数据量选择。所以,要想施展散布式系统并行计算的长处,就必需处置数据歪斜疑问。
二、数据歪斜的危害
当出现数据歪斜时,小量义务耗时远高于其它义务,从而使得全体耗时过大,未能充散施展散布式系统的并行计算长处。
另外,当出现数据歪斜时,部分义务处置的数据量过大,或许形成内存无余使得义务失败,并进而引进整个运行失败。
三、数据歪斜的现象
当发现如下现象时,十有八九是出现数据歪斜了:
在 Spark streaming 程序中,数据歪斜更容易出现,特意是在程序中蕴含一些相似 sql 的 join、group 这种操作的时刻。由于Spark Streaming 程序在运转的时刻,咱们普通不会调配特意多的内存,因此一旦在这个环节中出现一些数据歪斜,就十分容易形成 OOM。
四、数据歪斜的要素
在启动 shuffle 的时刻,必需将各个节点上相反的 key 拉取到某个节点上的一个 task 来启动处置,比如依照 key 启动聚合或 join等操作。此时假设某个 key 对应的数据量特意大的话,就会出现数据歪斜。比如大部分 key 对应10条数据,然而一般 key 却对应了100万条数据,那么大部分task 或许就只会调配到10条数据,而后1秒钟就运转完了;然而一般 task 或许调配到了100万数据,要运转一两个小时。
因此出现数据歪斜的时刻,Spark 作业看起来会运转得十分缓慢,甚至或许由于某个 task 处置的数据量过大造成内存溢出。
五、疑问发现与定位
1、经过 Spark Web UI
经过 Spark Web UI 来检查运转的 stage 各个 task 调配的数据量(Shuffle ReadSize/Records),从而进一步确定是不是 task 调配的数据不平均造成了数据歪斜。
知道数据歪斜出当初哪一个 stage 之后,接着咱们就须要依据 stage 划分原理,推算进去出现歪斜的那个 stage对应代码中的哪一部分,这部分代码中必需会有一个 shuffle 类算子。可以经过 countByKey 检查各个 key 的散布。
数据歪斜只会出当初 shuffle 环节中。这里给大家列举一些罕用的并且或许会触发 shuffle 操作的算子:distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition等。出现数据歪斜时,或许就是你的代码中经常使用了这些算子中的某一个所造成的。
2、经过 key 统计
也可以经过抽样统计 key 的出现次数验证。
由于数据量渺小,可以驳回抽样的方式,对数据启动抽样,统计出现的次数,依据出现次数大小排序取出前几个:
.().(, ).( (, )).( ).( (., .)).()// 依据 key 出现次数启动排序.()
假设发现少数数据散布都较为平均,而一般数据比其余数据大上若干个数量级,则说明出现了数据歪斜。
六、如何缓解数据歪斜
基本思绪
业务逻辑: 咱们从业务逻辑的层面过去优化数据歪斜,比如要统计不同市区的订单状况,那么咱们独自对这一线市区来做 count,最后和其它市区做整合。
程序成功: 比如说在 Hive 中,经常遇到 count(distinct)操作,这样会造成最终只要一个 reduce,咱们可以先 group再在外面包一层 count,就可以了;在 Spark 中经常使用 reduceByKey 替代 groupByKey 等。
参数调优: Hadoop 和 Spark 都自带了很多的参数和机制来调理数据歪斜,正当应用它们就能处置大部分疑问。
思绪1. 过滤意外数据
假设造成数据歪斜的 key 是意外数据,那么便捷的过滤掉就可以了。
首先要对 key 启动剖析,判别是哪些 key 形成数据歪斜。详细方法上方曾经引见过了,这里不赘述。
而后对这些 key 对应的记载启动剖析:
处置方案
关于第 1,2 种状况,间接对数据启动过滤即可。
第3种状况则须要不凡的处置,详细咱们上方详细引见。
思绪2. 提高 shuffle 并行度
Spark 在做 Shuffle 时,自动经常使用 HashPartitioner(非 HashShuffle)对数据启动分区。假设并行度设置的不适宜,或许形成少量不相反的 Key 对应的数据被调配到了同一个 Task 上,形成该 Task所处置的数据远大于其它 Task,从而形成数据歪斜。
假设调整 Shuffle 时的并行度,使得原本被调配到同一 Task 的不同 Key 发配到不同 Task 上处置,则可降低原 Task所需处置的数据量,从而缓解数据歪斜疑问形成的短板效应。
(1)操作流程
(2)适用场景
少量不同的 Key 被调配到了相反的 Task 形成该 Task 数据量过大。
(3)处置方案
调整并行度。普通是增大并行度,但有时如减小并行度也可到达成果。
(4)长处
成功便捷,只要要参数调优。可用最小的代价处置疑问。普通假设出现数据歪斜,都可以经过这种方法先实验几次,假设疑问未处置,再尝试其它方法。
(5)劣势
适用场景少,只是让每个 task 口头更少的不同的key。无法处置一般key特意大的状况形成的歪斜,假设某些 key 的大小十分大,即使一个 task独自口头它,也会遭到数据歪斜的困扰。并且该方法普通只能缓解数据歪斜,没有彻底消弭疑问。从通常阅从来看,其成果普通。
TIPS 可以把数据歪斜类比为 hash 抵触。提高并行度就相似于 提高 hash 表的大小。
思绪3. 自定义 Partitioner
(1)原理
经常使用自定义的 Partitioner(默以为 HashPartitioner),将原本被调配到同一个 Task 的不同 Key 调配到不同Task。
例如,咱们在 groupByKey 算子上,经常使用自定义的 Partitioner:
这个做法相当于自定义 hash 表的 哈希函数。
(2)适用场景
少量不同的 Key 被调配到了相反的 Task 形成该 Task 数据量过大。
(3)处置方案
经常使用自定义的 Partitioner 成功类替代自动的 HashPartitioner,尽量将一切不同的 Key 平均调配到不同的 Task 中。
(4)长处
不影响原有的并行度设计。假设扭转并行度,后续 Stage 的并行度也会自动扭转,或许会影响后续 Stage。
(5)劣势
适用场景有限,只能将不同 Key 扩散开,关于同一 Key对应数据集十分大的场景不适用。成果与调整并行度相似,只能缓解数据歪斜而不能齐全消弭数据歪斜。而且须要依据数据特点自定义公用的Partitioner,不够灵敏。
思绪4. Reduce 端 Join 转化为 Map 端 Join
经过 Spark 的 Broadcast 机制,将 Reduce 端 Join 转化为 Map 端 Join,这象征着 Spark 如今不须要跨节点做shuffle 而是间接经过本地文件启动 join,从而齐全消弭 Shuffle 带来的数据歪斜。
其中 A 是比拟小的> (1)适用场景
介入Join的一边数据集足够小,可被加载进 Driver 并经过 Broadcast 方法广播到各个 Executor 中。
(2)处置方案
在 Java/Scala 代码中将小数据集数据拉取到 Driver,而后经过 Broadcast 方案将小数据集的数据广播到各Executor。或许在经常使用 SQL 前,将 Broadcast 的阈值调整得足够大,从而使 Broadcast 失效。进而将 Reduce Join 交流为Map Join。
(3)长处
防止了 Shuffle,彻底消弭了数据歪斜发生的条件,可极大优化性能。
(4)劣势
由于是先将小数据经过 Broadcase 发送到每个 executor 上,所以须要介入 Join 的一方数据集足够小,并且重要适用于 Join的场景,不适宜聚合的场景,适用条件有限。
思绪5. 拆分 join 再 union
思绪很便捷,就是将一个 join 拆分红 歪斜数据集 Join 和 非歪斜数据集 Join,最后启动 union:
(1)适用场景
两张表都比拟大,无法经常使用 Map 端 Join。其中一个 RDD 有少数几个 Key 的数据量过大,另外一个 RDD 的 Key 散布较为平均。
(2)处置方案
将有数据歪斜的 RDD 中歪斜 Key 对应的数据集独自抽取进去加上随机前缀,另外一个 RDD每条数据区分与随机前缀联合构成新的RDD(相当于将其数据增到到原来的N倍,N即为随机前缀的总个数),而后将二者Join并去掉前缀。而后将不蕴含歪斜Key的残余数据启动Join。最后将两次Join的结果集经过union兼并,即可失掉所有Join结果。
(3)长处
相关于 Map 则 Join,更能顺应大数据集的Join。假设资源短缺,歪斜部分数据集与非歪斜部分数据集可并前启动,效率优化显著。且只针对歪斜部分的数据做数据扩展,参与的资源消耗有限。
(4)劣势
假设歪斜 Key 十分多,则另一侧数据收缩十分大,此方案不适用。而且此时对歪斜 Key 与非歪斜 Key分开处置,须要扫描数据集两遍,参与了开支。
思绪6. 大表 key 加盐,小表扩展 N 倍 jion
假设出现数据歪斜的 Key 比拟多,上一种方法将这些少量的歪斜 Key分拆进去,意义不大。此时更适宜间接对存在数据歪斜的数据集所有加上随机前缀,而后对另外一个不存在重大数据歪斜的数据集全体与随机前缀集作笛卡尔乘积(行将数据量扩展N倍)。
其实就是上一个方法的特例或许简化。少了拆分,也就没有 union。
(1)适用场景
一个数据集存在的歪斜 Key 比拟多,另外一个数据集数据散布比拟平均。
(2)长处
对大部分场景都适用,成果不错。
(3)劣势
须要将一个数据集全体扩展 N 倍,会参与资源消耗。
思绪7. map 端先部分聚合
在 map 端加个 combiner 函数启动部分聚合。加上 combiner 相当于提早启动 reduce ,就会把一个 mapper 中的相反 key启动聚合,缩小 shuffle 环节中数据量 以及 reduce 端的计算量。这种方法可以有效的缓解数据歪斜疑问,然而假设造成数据歪斜的 key少量散布在不同的 mapper 的时刻,这种方法就不是很有效了。
TIPS 经常使用 reduceByKey 而不是 groupByKey。
思绪8. 加盐部分聚合 + 去盐全局聚合
这个方案的**成功思绪就是启动两阶段聚合。第一次性是部分聚合,先给每个 key 都打上一个 1~n 的随机数,比如 3 以内的随机数,此时原先一样的 key就变成不一样的了,比如 (hello, 1) (hello, 1) (hello, 1) (hello, 1) (hello, 1),就会变成(1_hello, 1) (3_hello, 1) (2_hello, 1) (1_hello, 1) (2_hello, 1)。接着对打上随机数后的数据,口头reduceByKey 等聚合操作,启动部分聚合,那么部分聚合结果,就会变成了 (1_hello, 2) (2_hello, 2) (3_hello,1)。而后将各个 key 的前缀给去掉,就会变成 (hello, 2) (hello, 2) (hello,1),再次启动全局聚合操作,就可以失掉最终结果了,比如 (hello, 5)。
不过启动两次 mapreduce,性能稍微比一次性的差些。
七、Hadoop 中的数据歪斜
Hadoop 中间接贴近用户经常使用的是 Mapreduce 程序和 Hive 程序,虽说 Hive 最后也是用 MR 来口头(至少目前 Hive内存计算并不遍及),然而毕竟写的内容逻辑区别很大,一个是程序,一个是Sql,因此这里稍作区分。
Hadoop 中的数据歪斜重要表如今 ruduce 阶段卡在99.99%,不时99.99%不能完结。
这里假设详细的看日志或许和监控界面的话会发现:
阅历: Hive的数据歪斜,普通都出当初 Sql 中 Group 和 On 上,而且和数据逻辑绑定比拟深。
优化方法
说明
hive.map.aggr=true: 在map中会做部分汇集操作,效率更高但须要更多的内存。
hive.groupby.skewindata=true: 数据歪斜时负载平衡,入选项设定为true,生成的查问方案会有两个MRJob。第一个MRJob中,Map的输入结果汇合会随机散布到Reduce中,每个Reduce做部分聚合操作,并输入结果,这样处置的结果是相反的GroupByKey有或许被散发到不同的Reduce中,从而到达负载平衡的目标;第二个MRJob再依据预处置的数据结果依照GroupByKey散布到Reduce中(这个环节可以保障相反的GroupBy Key被散布到同一个Reduce中),最后成功最终的聚合操作。