黑洞

这里藏着一些独特的想法

0%

Hive之数据倾斜优化

概述

数据倾斜在MapReduce计算框架中经常发生。通俗理解,该现象指的是在整个计算过程中,大量相同的key被分配到了同一个任务上,造成“一个人累死、其他人闲死”的状况,这违背了分布式计算的初衷,使得整体的执行效率十分低下。

日常工作中数据倾斜主要发生在Reduce阶段,而很少发生在Map阶段,其原因是Map端的数据倾斜一般是由于HDFS数据存储不均匀造成的(公司的日志存储几乎都是均匀分块存储,每个文件大小基本固定),而Reduce阶段的数据倾斜几乎都是因为分析师没有考虑到某种key值数据量偏多的情况而导致的。

那么,数据倾斜会导致什么?

  1. 执行效率下降(整个执行时间,就看最后一个reduce结束时间)。
  2. 由于其中某几个reduce长时间运行,资源长期被占用。一旦超时,YARN强制回收资源,导致运行失败。
  3. 导致节点出现宕机问题。

什么时候会发生数据倾斜?

  1. 执行多表join查询的时候。
  2. 执行group by的时候。

group by 数据倾斜

map阶段 提前聚合

在每一个mapTask中进行提前聚合操作,将聚合之后结果发送给reduce,完成最终的聚合,从而减少从map到reduce的数据量,减轻数据倾斜压力。

1
2
-- 开启map端提前聚合操作(combiner)
set hive.map.aggr=true;

负载均衡

采用两个MR来解决,第一个MR负责将数据均匀落在不同reduce上,进行聚合统计操作,形成一个局部的结果。然后运行第二个MR读取第一个MR的局部结果,相同key发往同一个reduce,完成最终聚合统计操作。

1
2
-- 大combiner
set hive.groupby.skewindata=true;

注意:一旦启用负载均衡,Hive将不支持一个查询中使用多次distinct。一旦使用,就会报错!

1
2
3
4
5
-- 其中: 1,2,3 是可以正常执行的,4会报错。
(1) SELECT count(DISTINCT uid) FROM log
(2) SELECT ip, count(DISTINCT uid) FROM log GROUP BY ip
(3) SELECT ip, count(DISTINCT uid, uname) FROM log GROUP BY ip
(4) SELECT ip, count(DISTINCT uid), count(DISTINCT uname) FROM log GROUP BY ip

join 数据倾斜

map join

具体的实现方法和原理不重复说了,之前的优化大杂烩中有讲到。

排除法

由于map join的使用条件非常严苛,有时并不能满足。这时候我们可以将容易倾斜的值排除掉,单独用另一个MR来处理它们,方法可分为两种。

编译期排除

当明确知道表中哪些key的值有倾斜问题,可以在编译期解决。在建表的时候,提前设置好即可。这样在执行的时候,Hive会直接将这些倾斜key的值从这个MR排除掉,单独找一个MR来处理。

1
set hive.optimize.skewjoin.compiletime=true;

建表:

1
2
3
4
5
CREATE TABLE list_bucket_single (key STRING, value STRING)
-- 倾斜的字段和需要拆分的key值
SKEWED BY (key) ON (1,5,6)
-- 为倾斜值创建子目录单独存放
[STORED AS DIRECTORIES];

运行期排除

在执行的过程中,Hive会记录每一个key出现的次数。当出现次数达到设置的阈值后,认为这个key有倾斜的趋势,直接将这个key对应数据排除掉,单独找一个MR来处理。

1
2
set hive.optimize.skewjoin=true; -- 开启运行期倾斜解决join。
set hive.skewjoin.key=100000; -- 当key出现多少个的时候,认为有倾斜。

建议:

  • 如果提前知道表中有那些key有倾斜,在编译期解决即可。
  • 如果仅知道一部分, 对于其他key无法保证,那么编译期和运行期同时开启。

union all 优化

应用了表连接倾斜优化以后,会在执行计划中插入一个新的union操作,此时建议开启对union的优化配置。

1
2
3
4
5
set hive.optimize.skewjoin=true;
set hive.optimize.skewjoin.compiletime=true;

-- 此项配置减少对Union all子查询中间结果的二次读写
set hive.optimize.union.remove=true;

count distinct 优化

由于SQL中的Distinct操作本身会有一个全局排序的过程,一般情况下,不建议采用Count Distinct方式进行去重计数,除非表的数量比较小。

当SQL中不存在分组字段时,Count Distinct操作仅生成一个Reduce任务,该任务会对全部数据进行去重统计;

当SQL中存在分组字段时,可能某些Reduce任务需要去重统计的数量非常大。在这种情况下,我们可以通过以下方式替换。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
select
day,
count(distinct id) as id_cnt
from table_1
group by day;

-- 改写为
select day, count(1) as id_cnt
from
(
select day , id
from table_1
group by day, id
) tmp
group by day;

如果语句中存在多个Distinct命令,开发者需要评估下用空间换时间的方法是否能够提升效率。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
select
day,
count(distinct id1) as id1_cnt,
count(distinct id2) as id2_cnt
from table_1
group by day;

-- 改写为
select
day,
sum(if(tag = 'id1', 1, 0)) as id1_cnt,
sum(if(tag = 'id2', 1, 0)) as id2_cnt
from
(
select day, id1 as id, 'id1' as tag from table_1 group by day, id1
union all
select day, id2 as id, 'id2' as tag from table_1 group by day, id2
) tmp
group by day;

Reference: https://www.getui.com/college/2021010444

如果觉得文章写得不错或对您有帮助,请我喝杯柠檬茶吧!