博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Hive数据倾斜
阅读量:4206 次
发布时间:2019-05-26

本文共 5344 字,大约阅读时间需要 17 分钟。

Hive数据倾斜问题

  数据倾斜主要表现在MapReduce程序执行时,Reduce节点大部分执行完毕,但由于个别Reduce节点运行很慢,导致整个程序的处理时间很长。这是因为某一个Key的条数比其他Key多很多,导致这条Key所在的Reduce节点所处理的数据量比其他节点就大很多,从而导致某几个节点迟迟运行不完。

发生数据倾斜的Situation

数据倾斜可能会发生在group过程/join过程

join过程

大表和小表关联时

  一个上千万行的记录表和一个几千行表之间join关联时,容易发生数据倾斜。Hive中小表与大表join的优化策略——把重复关联键少的表放在join前面做关联可以提高join的效率

  Reduce在接收全部Map的输出后会有一个排序所有键值对并合并写入磁盘文件的操作。写入磁盘(spill)有可能是多次的,因此有可能会生成多个临时文件,但是最终都要合并成一个文件,即最终每一个Reduce都只处理一个文件。不论多复杂的Hive查询,最终都要转化成Mapreduce的job去执行,因此Hive对于关联的实现和Mapreduce对于关联的实现类似,即:把关联键和标记是在join左边还是右边的标识位作为组合键(key),把一条记录以及标记是在join左边还是右边的标识位组合起来作为值(value)。在Reduce的shuffle阶段,按照组合键的关联键进行主排序,当关联键相同时,再按照标识位进行辅助排序。而在分区段时,只用组合键中的关联键进行分区段,这样关联键相同的记录就会放在同一个value list中,同时保证了join左边的表的记录在value list的前面,而join右边的表的记录在value list的后面。
  对于A join B ON (A.id = b.id) ,假设A表和B表都有1条id =6的记录,那么A表这条记录的组合键是(6,0),B表这条记录的组合键是(6,1)。排序时可以保证A表的记录在B表的记录的前面。而在Reduce做处理时,把id=6的放在同一个value list中,形成 key = 6,value list = [A表id=6的记录,B表id=6的记录]。当两个表做关联时,Reduce会一起处理id相同的所有记录。把value list用数组来表示。
  1) Reduce先读取第一条记录v[0],如果发现v[0]是B表的记录,那说明没有A表的记录,最终不会关联输出,因此不用再继续处理这个id了,读取v[0]用了1次读取操作。如果发现v[0]到v[length-1]全部是A表的记录,那说明没有B表的记录,同样最终不会关联输出,但是这里注意,已经对value做了length次的读取操作。
  2) 例如A表id=6有1条记录,B表id=6有10条记录。首先读取v[0]发现是A表的记录,用了1次读取操作。然后再读取v[1]发现是B表的操作,这时v[0]和v[1]可以直接关联输出了,累计用了2次操作。这时候Reduce已经知道从v[1]开始后面都是B 表的记录了,因此可以直接用v[0]依次和v[2],v[3]……v[10]做关联操作并输出,累计用了11次操作。
  3) 换过来,假设A表id=6有10条记录,B表id=6有1条记录。首先读取v[0]发现是A表的记录,用了1次读取操作。然后再读取v[1]发现依然是A表的记录,累计用了2次读取操作。以此类推,读取v[9]时发现还是A表的记录,累计用了10次读取操作。然后读取最后1条记录v[10]发现是B表的记录,可以将v[0]和v[10]进行关联输出,累计用了11次操作。接下来可以直接把v[1]~v[9]分别与v[10]进行关联输出,累计用了20次操作。
  4) 当Reduce检测A表的记录时,还要记录A表同一个key的记录的条数,当发现同一个key的记录个数超过hive.skewjoin.key的值(默认为1000000)时,会在Reduce的日志中打印出该key,并标记为倾斜的关联键。
结论:写在关联左侧的表每有1条重复的关联键时底层就会多1次运算处理。假设A表有一千万个id,平均每个id有3条重复值,那么把A表放在前面做关联就会多做三千万次的运算处理。

解决方式

1.多表关联时,将小表(关联键记录少的表)依次放到前面,这样可以触发Reduce端更少的操作次数,减少运行时间。
2.同时可以使用Map Join让小的维度表缓存到内存。在Map端完成join过程,从而省略掉Redcue端的工作。但是使用这个功能,需要开启map-side join的设置属性:set hive.auto.convert.join=true(默认是false),同时还可以设置使用这个优化的小表的大小:set hive.mapjoin.smalltable.filesize=25000000(默认值25M)

大表和大表的关联

  大表与大表关联,但是其中一张表多是空值或者0比较多,容易shuffle给一个Reduce,造成运行慢。

解决方式1

  这种情况可以对异常值赋一个随机值来分散key,均匀分配给多个Reduce去执行,解决办法的突破点就在于把左表的未关联记录的key尽可能打散,若左表关联字段无效(为空、字段长度为零、字段填充了非整数),则在关联前将左表关联字段设置为一个随机数,再去关联右表,这么做的目的是即使是左表的未关联记录,它的key也分布得十分均匀,比如:

select  ...from trackinfo a left outer join pm_info bon ( case when (a.ext_field7 is not null and length(a.ext_field7) > 0    and a.ext_field7 rlike ' ^ [0-9]+$')      then cast(a.ext_field7 as bigint)      else cast(ceiling(rand() * -65535) as bigint) end = b.id )   #将A表垃圾数据(为null,为0,以及其他类型的数据)赋一个随机的负数,然后将这些数据shuffle到不同Reduce处理。

解决方式2

  当key值都是有效值时,解决办法为设置以下几个参数:

set hive.exec.reducers.bytes.per.reducer = 1000000000//每个节点的Reduce默认是处理1G大小的数据set hive.optimize.skewjoin = true;set hive.skewjoin.key = skew_key_threshold (default = 100000)

  Hive 在运行的时候没有办法判断哪个key 会产生多大的倾斜,所以使用这个参数控制倾斜的阈值,如果超过这个值,新的值会发送给那些还没有达到阈值的Reduce, 一般可以设置成处理的总记录数/Reduce个数的2-4倍。倾斜是经常会存在的,一般select 的层数超过2层,翻译成执行计划多于3个以上的MapReduce job很容易产生倾斜,建议每次运行比较复杂的SQL之前都可以设置这个参数。官方默认的1个Reduce 只处理1G 的算法,那么skew_key_threshold = 1G/平均行长或者默认设成250000000 (差不多算平均行长4个字节)。

group过程

  使用Hive对数据做一些类型统计的时候遇到过某种类型的数据量特别多,而其他类型数据的数据量特别少也会造成数据倾斜。当按照类型进行group by的时候,会将相同的group by字段的Reduce任务需要的数据拉取到同一个节点进行聚合,而当其中每一组的数据量过大时,会出现其他组的计算已经完成而这里还没计算完成,其他节点的一直等待这个节点的任务执行完成,所以会看到一直Map 100%;Reduce 99%的情况。

解决方式1

hive.map.aggr=true  //(默认true) 这个配置项代表是否在map端进行聚合,相当于Combinerhive.groupby.skewindata=true//(默认false)

  有数据倾斜的时候进行负载均衡,当选项设定为 true,生成的查询计划会有两个MapReduce Job。第一个MapReduce Job 中,Map的输出结果集合会随机分布到 Reduce中,每个Reduce做部分聚合操作,并输出结果,这样处理的结果是相同的Group By Key有可能被分发到不同的 Reduce 中,从而达到负载均衡的目的;第二个 MR Job 再根据预处理的数据结果按照Group By Key分布到Reduce 中(此过程可以保证相同的 Group By Key 被分布到同一个 Reduce 中),最后完成最终的聚合操作。

其他情况

不同数据类型关联产生数据倾斜

  一张表tab的日志,每个商品一条记录,要和商品表关联。tab的日志中有字符串商品id,也有数字的商品id,类型是String的,但商品中的数字id是bigint类型的。造成数据倾斜的原因是把tab的商品id转成数字id做hash来分配Reduce,所以字符串id的tab日志,都到一个Reduce上了。

解决方法:把数字类型转换成字符串类型

Select * from tab a Left outer join r_auction_auctions b On a.auction_id =cast(b.auction_id as string);

HQL中包含count(distinct)

  如果数据量非常大,执行如select a,count(distinct b) from t group by a;类型的SQL时,会出现数据倾斜的问题。

解决方法:使用sum…group by代替。

select a,sum(1) from (select a, b from t group by a,b) group by a;

Reduce个数太少

  Reduce数太少set mapred.reduce.tasks=800;默认是先设置hive.exec.reducers.bytes.per.reducer这个参数,设置了后Hive会自动计算Reduce的个数,因此两个参数一般不同时使用。

Hive优化join操作

第一:在map端产生join

  map Join的主要意思就是,当链接的两个表是一个比较小的表和一个特别大的表的时候,把比较小的table直接放到内存中去,然后再对比较大的表进行map操作。join就发生在map操作的时候,每当扫描一个大的table中的数据,就要去去查看小表的数据,哪条与之相符,继而进行连接。这里的join并不会涉及reduce操作。map端join的优势就是在于没有shuffle。在实际的应用中设置:set hive.auto.convert.join=true; 这样设置,Hive会自动识别比较小的表,继而用map Join来实现两个表的联合。在本质上map join根本就没有运行MR进程,仅仅是在内存就进行了两个表的联合。

第二:common join

  common join也叫做shuffle join,reduce join操作。这种情况下两个table的大小相当,但是又不是很大的情况下使用的。具体流程就是在map端进行数据的切分,一个block对应一个map操作,然后进行shuffle操作,把对应的block shuffle到reduce端去,再逐个进行联合,这里优势会涉及到数据的倾斜,大幅度的影响性能有可能会运行speculation。

第三:SMBJoin

  SMB是sort merge bucket操作,首先进行排序,继而合并,然后放到所对应的bucket中去,bucket是Hive中和分区表类似的技术,就是按照key进行hash,相同的hash值都放到相同的buck中去。在进行两个表联合的时候。我们首先进行分桶,在join会大幅度的对性能进行优化。也就是说,在进行联合的时候,是table1中的一小部分和table1中的一小部分进行联合,table联合都是等值连接,相同的key都放到了同一个bucket中去了,那么在联合的时候就会大幅度的减小无关项的扫描。

set hive.auto.convert.sortmerge.join=true;set hive.optimize.bucketmapjoin = true;set hive.optimize.bucketmapjoin.sortedmerge = true;set hive.auto.convert.sortmerge.join.noconditionaltask=true;

转载地址:http://eyhli.baihongyu.com/

你可能感兴趣的文章
九度OJ 1471-1480(10/10)
查看>>
九度OJ 1481-1490(7/10)
查看>>
九度OJ 1491-1500(5/10)
查看>>
九度OJ 1501-1510(10/10)
查看>>
业务系统中,报表统计功能如何组织--统计分析模块参考
查看>>
面向数据集成的ETL技术研究
查看>>
DataStage(ETL)技术总结 -- 介绍篇(转载)
查看>>
Greenplum技术浅析--vs oracle RAC
查看>>
框架一
查看>>
Oracle-内存管理解读
查看>>
Oracle-PFILE和SPFILE解读
查看>>
leetcode 13: Roman to Integer
查看>>
a标签中调用js方法
查看>>
js函数中传入的event参数
查看>>
[hive]优化策略
查看>>
c++14现代内存管理
查看>>
右值引用,move语义和完美转发
查看>>
c++使用宏检测类是否包含某个函数或者变量属性
查看>>
CSS3之Transition
查看>>
CSS之media Query
查看>>