Spark案例分析之二次排序

技术文档网 2021-04-29

需求分析:

  1. 按照第一个字段进行分组
  2. 对分组中的第二字段进行排序(降序)
  3. 获取每个分组Top N,比如获取前三个值

创建一个文件,上传到hdfs文件系统上,内容如下:

aa 78

bb 98

aa 80

cc 98

aa 69

cc 87

bb 97

cc 86

aa 97

bb 78

bb 34

cc 85

bb 92

cc 72

bb 32

bb 23

上传至hdfs系统中: [shinelon@hadoop-senior hadoop-2.5.0-cdh5.3.6]$ bin/hdfs dfs -put /opt/datas/spart-sort.txt /user/shinelon/spark/

功能分析: (aa,list(78,80,69,97)) -> (aa,list(69,78,80,97)) -> (aa,list(69,78,80))

1.读取外部数据

val rdd = sc.textFile("hdfs://hadoop-senior.ibeifeng.com:8020/user/beifeng/spark/grouptop/input/score.input")

2.分割为Array

rdd.map(_.split(" ")).collect

Array(aa, 78)

3.将数组转化为元组

rdd.map(_.split(" ")).map(x => (x(0),x(1))).collect

(aa,78)

4.按照key进行分组

rdd.map(_.split(" ")).map(x => (x(0),x(1))).groupByKey.collect

(aa,CompactBuffer(78, 80, 69, 97))

5.打印第二个元素

rdd.map(_.split(" ")).map(x => (x(0),x(1))).groupByKey.map( x => { val xx = x._1 val yy = x._2 yy
} ).collect Iterable[String]

Iterable 方法: def toList: List[A] 返回包含此遍历的迭代器的所有元素的列表

6.将CompactBuffer转换为list

rdd.map(_.split(" ")).map(x => (x(0),x(1))).groupByKey.map( x => { val xx = x._1 val yy = x._2 yy.toList
} ).collect

List[String]

List(78, 80, 69, 97)

List 方法:

def sorted[B >: A]: List[A]

根据排序对列表进行排序

7.排序

rdd.map(_.split(" ")).map(x => (x(0),x(1))).groupByKey.map( x => { val xx = x._1 val yy = x._2 yy.toList.sorted } ).collect

List[String]

List(69, 78, 80, 97)

List 方法:

def reverse: List[A]

返回新列表,在相反的顺序元素

8.将排序后的list反转

rdd.map(_.split(" ")).map(x => (x(0),x(1))).groupByKey.map( x => { val xx = x._1 val yy = x._2 yy.toList.sorted.reverse } ).collect

List[String]

List(97, 80, 78, 69)

List 方法:

def take(n: Int): List[A]

返回前n个元素

def takeRight(n: Int): List[A]

返回最后n个元素

9.取list的前三个元素

rdd.map(_.split(" ")).map(x => (x(0),x(1))).groupByKey.map( x => { val xx = x._1 val yy = x._2 yy.toList.sorted.reverse.take(3) } ).collect

10.要求返回的是一个元组对

rdd.map(_.split(" ")).map(x => (x(0),x(1))).groupByKey.map( x => { val xx = x._1 val yy = x._2 (xx,yy.toList.sorted.reverse.take(3)) } ).collect

11.将结果存储到hdfs文件系统

val groupTopKeyRdd = rdd.map(_.split(" ")).map(x => (x(0),x(1))).groupByKey.map( x => { val xx = x._1 val yy = x._2 (xx,yy.toList.sorted.reverse.take(3)) } )

groupTopKeyRdd.saveAsTextFile("hdfs://hadoop-senior.ibeifeng.com:8020/user/beifeng/spark/grouptop/output")

相关文章

  1. 基于-SLF4J-MDC-机制的日志链路追踪配置属性

    ums: # ================ 基于 SLF4J MDC 机制的日志链路追踪配置属性 ================ mdc: # 是否支持基于 SLF4J MDC

  2. ajax-跨域访问

    ajax 跨域访问 <!DOCTYPE html> <html xmlns:th="http://www.w3.org/1999/xhtml"> <head>

  3. 给第三方登录时用的数据库表-user_connection-与-auth_token-添加-redis-cache

    spring: # 设置缓存为 Redis cache: type: redis # redis redis: host: 192.168.88.88 port

  4. Java动态代理

    Jdk动态代理 通过InvocationHandler和Proxy针对实现了接口的类进行动态代理,即必须有相应的接口 应用 public class TestProxy { public

  5. Java读取classpath中的文件

    public void init() { try { //URL url = Thread.currentThread().getContextClassLo

随机推荐

  1. 基于-SLF4J-MDC-机制的日志链路追踪配置属性

    ums: # ================ 基于 SLF4J MDC 机制的日志链路追踪配置属性 ================ mdc: # 是否支持基于 SLF4J MDC

  2. ajax-跨域访问

    ajax 跨域访问 <!DOCTYPE html> <html xmlns:th="http://www.w3.org/1999/xhtml"> <head>

  3. 给第三方登录时用的数据库表-user_connection-与-auth_token-添加-redis-cache

    spring: # 设置缓存为 Redis cache: type: redis # redis redis: host: 192.168.88.88 port

  4. Java动态代理

    Jdk动态代理 通过InvocationHandler和Proxy针对实现了接口的类进行动态代理,即必须有相应的接口 应用 public class TestProxy { public

  5. Java读取classpath中的文件

    public void init() { try { //URL url = Thread.currentThread().getContextClassLo