Spark案例分析之二次排序
需求分析:
- 按照第一个字段进行分组
- 对分组中的第二字段进行排序(降序)
- 获取每个分组Top N,比如获取前三个值
创建一个文件,上传到hdfs文件系统上,内容如下:
aa 78
bb 98
aa 80
cc 98
aa 69
cc 87
bb 97
cc 86
aa 97
bb 78
bb 34
cc 85
bb 92
cc 72
bb 32
bb 23
上传至hdfs系统中:
[shinelon@hadoop-senior hadoop-2.5.0-cdh5.3.6]$ bin/hdfs dfs -put /opt/datas/spart-sort.txt /user/shinelon/spark/
功能分析: (aa,list(78,80,69,97)) -> (aa,list(69,78,80,97)) -> (aa,list(69,78,80))
1.读取外部数据
val rdd = sc.textFile("hdfs://hadoop-senior.ibeifeng.com:8020/user/beifeng/spark/grouptop/input/score.input")
2.分割为Array
rdd.map(_.split(" ")).collect
Array(aa, 78)
3.将数组转化为元组
rdd.map(_.split(" ")).map(x => (x(0),x(1))).collect
(aa,78)
4.按照key进行分组
rdd.map(_.split(" ")).map(x => (x(0),x(1))).groupByKey.collect
(aa,CompactBuffer(78, 80, 69, 97))
5.打印第二个元素
rdd.map(_.split(" ")).map(x => (x(0),x(1))).groupByKey.map(
x => {
val xx = x._1
val yy = x._2
yy
}
).collect
Iterable[String]
Iterable 方法: def toList: List[A] 返回包含此遍历的迭代器的所有元素的列表
6.将CompactBuffer转换为list
rdd.map(_.split(" ")).map(x => (x(0),x(1))).groupByKey.map(
x => {
val xx = x._1
val yy = x._2
yy.toList
}
).collect
List[String]
List(78, 80, 69, 97)
List 方法:
def sorted[B >: A]: List[A]
根据排序对列表进行排序
7.排序
rdd.map(_.split(" ")).map(x => (x(0),x(1))).groupByKey.map( x => { val xx = x._1 val yy = x._2 yy.toList.sorted } ).collect
List[String]
List(69, 78, 80, 97)
List 方法:
def reverse: List[A]
返回新列表,在相反的顺序元素
8.将排序后的list反转
rdd.map(_.split(" ")).map(x => (x(0),x(1))).groupByKey.map( x => { val xx = x._1 val yy = x._2 yy.toList.sorted.reverse } ).collect
List[String]
List(97, 80, 78, 69)
List 方法:
def take(n: Int): List[A]
返回前n个元素
def takeRight(n: Int): List[A]
返回最后n个元素
9.取list的前三个元素
rdd.map(_.split(" ")).map(x => (x(0),x(1))).groupByKey.map( x => { val xx = x._1 val yy = x._2 yy.toList.sorted.reverse.take(3) } ).collect
10.要求返回的是一个元组对
rdd.map(_.split(" ")).map(x => (x(0),x(1))).groupByKey.map( x => { val xx = x._1 val yy = x._2 (xx,yy.toList.sorted.reverse.take(3)) } ).collect
11.将结果存储到hdfs文件系统
val groupTopKeyRdd = rdd.map(_.split(" ")).map(x => (x(0),x(1))).groupByKey.map( x => { val xx = x._1 val yy = x._2 (xx,yy.toList.sorted.reverse.take(3)) } )
groupTopKeyRdd.saveAsTextFile("hdfs://hadoop-senior.ibeifeng.com:8020/user/beifeng/spark/grouptop/output")
相关文章
- 基于-SLF4J-MDC-机制的日志链路追踪配置属性
ums: # ================ 基于 SLF4J MDC 机制的日志链路追踪配置属性 ================ mdc: # 是否支持基于 SLF4J MDC
- ajax-跨域访问
ajax 跨域访问 <!DOCTYPE html> <html xmlns:th="http://www.w3.org/1999/xhtml"> <head>
- 给第三方登录时用的数据库表-user_connection-与-auth_token-添加-redis-cache
spring: # 设置缓存为 Redis cache: type: redis # redis redis: host: 192.168.88.88 port
- Java动态代理
Jdk动态代理 通过InvocationHandler和Proxy针对实现了接口的类进行动态代理,即必须有相应的接口 应用 public class TestProxy { public
- Java读取classpath中的文件
public void init() { try { //URL url = Thread.currentThread().getContextClassLo
随机推荐
- 基于-SLF4J-MDC-机制的日志链路追踪配置属性
ums: # ================ 基于 SLF4J MDC 机制的日志链路追踪配置属性 ================ mdc: # 是否支持基于 SLF4J MDC
- ajax-跨域访问
ajax 跨域访问 <!DOCTYPE html> <html xmlns:th="http://www.w3.org/1999/xhtml"> <head>
- 给第三方登录时用的数据库表-user_connection-与-auth_token-添加-redis-cache
spring: # 设置缓存为 Redis cache: type: redis # redis redis: host: 192.168.88.88 port
- Java动态代理
Jdk动态代理 通过InvocationHandler和Proxy针对实现了接口的类进行动态代理,即必须有相应的接口 应用 public class TestProxy { public
- Java读取classpath中的文件
public void init() { try { //URL url = Thread.currentThread().getContextClassLo