Spark-数据倾斜解决方案
一、提高Shuffle操作reduce端的并行度
可以在调用 reduceByKey 或者 groupByKey 的时候传入一个参数指定 reduce 第二个stage 的并行度,这样shuffle之后每个task分配的数据不会很多,基本上解决了数据倾斜的问题,但是治标不治本。
二、Reduce Join 转换为 Map Join
对于join操作,肯定会走shuffle操作,因为我们可以直接在map聚合相同key的数据,避免发生shuffle操作,从而从根源上杜绝了数据倾斜问题。
适用场景: 如果是两个RDD进行join操作,其中一个比较小。因为需要将小的RDD进行broadcast出去,会在每个executor的blockmanager中驻留一份,因此要确保 内存足够去存放那个小的RDD的数据。
实现步骤: 1.将小的RDD做一个广播变量。 2.使用mapToPair函数将相同key的数据聚集到一个task中。
三、随机 key 进行双重聚合
使用与 reduceByKey 或者 groupByKey
实现步骤:
- 首先对容易产生数据倾斜的 shuffle 操作的 key 进行标记打散(加前缀或者后缀)。
- 实现局部聚合操作,即聚合 key 打散后的数据。
- 去除标记(去掉前缀或后缀)。
- 进行全局聚合。
四、Sample随机抽取倾斜key进行二次join
实现原理: 随机取样按照key出现的次数获取倾斜key,将一个RDD转换为两个RDD,然后使用这两个RDD和需要join的RDD分别进行join操作,最后使用union 操作合并两个join,解决数据倾斜。
关键点: join操作的两个RDD中有一个RDD的key的数量比较少,但是数据量比较大,适合使用。其核心在于将倾斜的key在join操作时分配到多个task中防止数据倾斜。
实现步骤:
- 随机采样一部分RDD
- 按照key出现的次数排序,选取出现次数最多的几个key
- 分别用抽取的key和剩余的key和其他RDD进行join操作
- 将两个join的key进行union操作
优化方案 : 将那个抽取的key加一个100以内的随机数前缀进行打散操作,使得均匀分布 然后再进行join操作,之后去掉前缀。
五、随机数以及扩容表进行join操作
实现步骤:
- 对一个RDD进行随机扩容,比如每条数据增加10以内的前缀数据,扩大10倍。
- 对另一个RDD加随机数前缀打散。
- 两个RDD再进行join操作。
相关文章
- 基于-SLF4J-MDC-机制的日志链路追踪配置属性
ums: # ================ 基于 SLF4J MDC 机制的日志链路追踪配置属性 ================ mdc: # 是否支持基于 SLF4J MDC
- ajax-跨域访问
ajax 跨域访问 <!DOCTYPE html> <html xmlns:th="http://www.w3.org/1999/xhtml"> <head>
- 给第三方登录时用的数据库表-user_connection-与-auth_token-添加-redis-cache
spring: # 设置缓存为 Redis cache: type: redis # redis redis: host: 192.168.88.88 port
- Java动态代理
Jdk动态代理 通过InvocationHandler和Proxy针对实现了接口的类进行动态代理,即必须有相应的接口 应用 public class TestProxy { public
- Java读取classpath中的文件
public void init() { try { //URL url = Thread.currentThread().getContextClassLo
随机推荐
- 基于-SLF4J-MDC-机制的日志链路追踪配置属性
ums: # ================ 基于 SLF4J MDC 机制的日志链路追踪配置属性 ================ mdc: # 是否支持基于 SLF4J MDC
- ajax-跨域访问
ajax 跨域访问 <!DOCTYPE html> <html xmlns:th="http://www.w3.org/1999/xhtml"> <head>
- 给第三方登录时用的数据库表-user_connection-与-auth_token-添加-redis-cache
spring: # 设置缓存为 Redis cache: type: redis # redis redis: host: 192.168.88.88 port
- Java动态代理
Jdk动态代理 通过InvocationHandler和Proxy针对实现了接口的类进行动态代理,即必须有相应的接口 应用 public class TestProxy { public
- Java读取classpath中的文件
public void init() { try { //URL url = Thread.currentThread().getContextClassLo