Spark-数据倾斜解决方案

技术文档网 2021-04-29

一、提高Shuffle操作reduce端的并行度

可以在调用 reduceByKey 或者 groupByKey 的时候传入一个参数指定 reduce 第二个stage 的并行度,这样shuffle之后每个task分配的数据不会很多,基本上解决了数据倾斜的问题,但是治标不治本。

二、Reduce Join 转换为 Map Join

对于join操作,肯定会走shuffle操作,因为我们可以直接在map聚合相同key的数据,避免发生shuffle操作,从而从根源上杜绝了数据倾斜问题。

适用场景: 如果是两个RDD进行join操作,其中一个比较小。因为需要将小的RDD进行broadcast出去,会在每个executor的blockmanager中驻留一份,因此要确保 内存足够去存放那个小的RDD的数据。

实现步骤: 1.将小的RDD做一个广播变量。 2.使用mapToPair函数将相同key的数据聚集到一个task中。

三、随机 key 进行双重聚合

使用与 reduceByKey 或者 groupByKey

实现步骤:

  1. 首先对容易产生数据倾斜的 shuffle 操作的 key 进行标记打散(加前缀或者后缀)。
  2. 实现局部聚合操作,即聚合 key 打散后的数据。
  3. 去除标记(去掉前缀或后缀)。
  4. 进行全局聚合。

四、Sample随机抽取倾斜key进行二次join

实现原理: 随机取样按照key出现的次数获取倾斜key,将一个RDD转换为两个RDD,然后使用这两个RDD和需要join的RDD分别进行join操作,最后使用union 操作合并两个join,解决数据倾斜。

关键点: join操作的两个RDD中有一个RDD的key的数量比较少,但是数据量比较大,适合使用。其核心在于将倾斜的key在join操作时分配到多个task中防止数据倾斜。

实现步骤:

  1. 随机采样一部分RDD
  2. 按照key出现的次数排序,选取出现次数最多的几个key
  3. 分别用抽取的key和剩余的key和其他RDD进行join操作
  4. 将两个join的key进行union操作

优化方案 : 将那个抽取的key加一个100以内的随机数前缀进行打散操作,使得均匀分布 然后再进行join操作,之后去掉前缀。

五、随机数以及扩容表进行join操作

实现步骤:

  1. 对一个RDD进行随机扩容,比如每条数据增加10以内的前缀数据,扩大10倍。
  2. 对另一个RDD加随机数前缀打散。
  3. 两个RDD再进行join操作。

相关文章

  1. 基于-SLF4J-MDC-机制的日志链路追踪配置属性

    ums: # ================ 基于 SLF4J MDC 机制的日志链路追踪配置属性 ================ mdc: # 是否支持基于 SLF4J MDC

  2. ajax-跨域访问

    ajax 跨域访问 <!DOCTYPE html> <html xmlns:th="http://www.w3.org/1999/xhtml"> <head>

  3. 给第三方登录时用的数据库表-user_connection-与-auth_token-添加-redis-cache

    spring: # 设置缓存为 Redis cache: type: redis # redis redis: host: 192.168.88.88 port

  4. Java动态代理

    Jdk动态代理 通过InvocationHandler和Proxy针对实现了接口的类进行动态代理,即必须有相应的接口 应用 public class TestProxy { public

  5. Java读取classpath中的文件

    public void init() { try { //URL url = Thread.currentThread().getContextClassLo

随机推荐

  1. 基于-SLF4J-MDC-机制的日志链路追踪配置属性

    ums: # ================ 基于 SLF4J MDC 机制的日志链路追踪配置属性 ================ mdc: # 是否支持基于 SLF4J MDC

  2. ajax-跨域访问

    ajax 跨域访问 <!DOCTYPE html> <html xmlns:th="http://www.w3.org/1999/xhtml"> <head>

  3. 给第三方登录时用的数据库表-user_connection-与-auth_token-添加-redis-cache

    spring: # 设置缓存为 Redis cache: type: redis # redis redis: host: 192.168.88.88 port

  4. Java动态代理

    Jdk动态代理 通过InvocationHandler和Proxy针对实现了接口的类进行动态代理,即必须有相应的接口 应用 public class TestProxy { public

  5. Java读取classpath中的文件

    public void init() { try { //URL url = Thread.currentThread().getContextClassLo