rdd 内生分组_04、常用RDD操作整理-程序员宅基地

技术标签: rdd 内生分组  

常用Transformation

注:某些函数只有PairRDD只有,而普通的RDD则没有,比如gropuByKey、reduceByKey、sortByKey、join、cogroup等函数要根据Key进行分组或直接操作

RDD基本转换:

RDD[U]

map(f: T => U)

T:原RDD中元素类型

U:新RDD中元素类型

函数将T元素转换为新的U元素

rdd.map(x

=> x + 1)

{1, 2, 3, 3}

=>{2,

3, 4, 4}

RDD[U]

flatMap(f: T => TraversableOnce[U])

TraversableOnce:集合与迭代器的父类

函数将T元素转换为含有新类型U元素的集合,并将这些集合展平(两层转换成一层)后的元素形成新的RDD

rdd.flatMap(x

=> x.to(3))

{1, 2, 3, 3}

=>{1,

2, 3, 2, 3, 3, 3}

RDD[T]

filter(f: T => Boolean)

函数对每个元素进行过滤,通过的元素形成新的RDD

rdd.filter(x

=> x != 1)

{1, 2, 3, 3}

=>{2,

3, 3}

RDD[T]

distinct()

去重

rdd.distinct()

{1, 2, 3, 3}

=>{1,

2, 3}

RDD[U]

mapPartitions(f: Iterator[T] =>

Iterator[U])

与map一样,只是转换时是以分区为单位,将一个分区所有元素包装成Iterator一次性传入函数进行处理,而不像map函数那样每个元素都会调用一个函数,即这里有几个分区则才调用几次函数

假设有N个元素,有M个分区,那么map的函数的将被调用N次,而mapPartitions被调用M次

valarr= Array(1,2,3,4,5)

valrdd=sc.parallelize(arr,2)

rdd.mapPartitions((it:

Iterator[Int]) => {varl = List[Int]();

it.foreach((e: Int) => l = e *2:: l); l.iterator })

=>{2,4,6,8,10}

RDD[U]

mapPartitionsWithIndex(f: (Int, Iterator[T]) => Iterator[U])

与mapPartitions类似,不同的时函数多了个分区索引的参数

RDD[T]

union(other: RDD[T])

两个RDD并集,包括重复的元素

rdd.union(otherRdd)

{ 1, 2, 2, 3, 3}

{ 3, 4, 5}

=>{1,

2, 2, 3, 3, 3, 4, 5}

RDD[T]

intersection(other: RDD[T])

两个RDD交集

rdd.intersection(otherRdd)

{ 1, 2, 2, 3, 3}

{ 3, 4, 5}

=>{3}

RDD[T]

subtract(other: RDD[T])

两个RDD相减

rdd.subtract(otherRdd)

{ 1, 2, 2, 3, 3}

{ 3, 4, 5}

=>{1,

2, 2}

RDD[(T,

U)] cartesian(other: RDD[U])

两个RDD相减笛卡儿积

rdd.cartesian(otherRdd)

{ 1, 2 }

{ 3, 4}

=>{(1,3),(1,4),(2,3),(2,4)}

RDD[T]

sortBy( f: (T) => K, ascending:

Boolean,numPartitions: Int)

根据转换后的值进行排序,传入的是一个(T) => K转换函数

rdd.sortBy(_._2,

false, 1)

这里根据value进行降序排序

{("leo", 65), ("tom", 50), ("marry", 100),

("jack", 80)}

=>{("marry",

100),("jack", 80),("leo", 65), ("leo", 65)}

RDD[Array[T]]

glom()

将RDD的每个分区中的类型为T的元素转换换数组Array[T]

valarr= Array(1,2,3,4,5)

valrdd=sc.parallelize(arr,2)

valarrRDD=rdd.glom()arrRDD.foreach {

(arr: Array[Int]) => { println("[ "+ arr.mkString("

") +" ]"); } }

=>[ 1 2 ],[ 3 4 5 ]

键-值RDD转换:

RDD[(K,

U)] mapValues[U](f: V => U)

K:key类型

V:value类型

将value转换为新的U元素,Key不变

rdd.mapValues(_

+ 1)

{"class1", 80), ("class2", 70)}

=>{"class1",

81), ("class2", 71)}

RDD[(K,

U)] flatMapValues(f: V =>

TraversableOnce[U])

对[K,V]型数据中的V值flatmap操作

rdd.flatMapValues(_.toCharArray())

{ (1, "ab"), (2, "bc")}

=>{(1,

'a'), (1, 'b'), (2, 'b'), (2, 'c')}

RDD[(K,

Iterable[V])] groupByKey()

根据key进行分组,同一组的元素组成Iterable,并以(key, Iterable)元组类型为元素作为新的RDD返回

rdd.groupByKey()

{("class1", 80), ("class2", 75),

("class1", 90), ("class2", 60)}

=>{("class1",[80,90]),("class2",[75,60])}

RDD[(K,

Iterable[T])] groupBy(f: T => K)

T:原RDD元素类型

K:新RDD中元素Key的类型

根据函数将元素T映射成相应K后,以此K进行分组

rdd.groupBy({

case 1 => 1; case 2 => 2; case "二" => 2 })

{ 1, 2, "二"

}

=>{(1,[1]),(2,[2,

"二"])}

RDD[(K,

V)] reduceByKey(func: (V, V) => V)

先根据key进行分组,再对同一组中的的value进行reduce操作:第一次调用函数时传入的是两个Key所对应的value,从第二次往后,传入的两个参数中的第一个为上次函数计算的结果,第二个参数为其它Key的value

rdd.

reduceByKey(_ + _)

{("class1", 80), ("class2", 75),

("class1", 90), ("class2", 60)}

=>{("class1",

170),("class2", 135)}

RDD[(K,

V)] sortByKey()

根据key的大小进行排序(注:并不是先以Key进行分组,再对组类进行排序,而是直接根据Key的值进行排序)

rdd.sortByKey(false)

{(65, "leo"), (50, "tom"),(100,

"marry"), (85, "jack")}

=>{(100,

"marry"),(85, "jack"),(65, "eo"),(50,

"tom")}

RDD[(K,

V)] foldByKey(zeroValue: V)(func: (V,

V) => V):

zeroValue:每个分区相同Key累计时的初始值,以及不同分区相同Key合并时的初始值

e.g., Nilfor list concatenation, 0

for addition, or 1 for multiplication

对每个value先进行func操作,且funcfoldByKey函数是通过调用函数实现的。

zeroVale:对V进行初始化,实际上是通过CombineByKey的createCombiner实现的V =>

(zeroValue,V),再通过func函数映射成新的值,即func(zeroValue,V)

func: Value将通过func函数按Key值进行合并(实际上是通过CombineByKey的mergeValue,mergeCombiners函数实现的,只不过在这里,这两个函数是相同的)

valpeople= List(("Mobin",1), ("Lucy",2), ("Amy",3), ("Amy",4), ("Lucy",5))

valrdd=sc.parallelize(people,2)

valfoldByKeyRDD=rdd.foldByKey(10)((v1, v2)

=> { println(v1 +" + "+ v2 +" =

"+ (v1 + v2)); v1 + v2 })//先对每个V都加10,再对相同Key的value值相加

foldByKeyRDD.foreach(println)

//处理第一个分区数据

10+ 1 = 11 // ("Mobin",

1)

10+ 2 = 12 // ("Lucy",

2)

=====================

//处理第二个分区数据

10+ 3 = 13 // ("Amy", 3)

13 + 4

= 17 // ("Amy", 4)同分区同Key的Val先合并

10+ 5 = 15 // ("Lucy",

5)

=====================

//将不同分区相同Key的Value合并起来

12 +

15 = 27 // "Lucy"跨分区,所以需合并

(Amy,17)

(Mobin,11)

(Lucy,27)

RDD[(K,

(V, Option[W]))] leftOuterJoin[W](other:

RDD[(K, W)]):

左外连接,包含左RDD的所有数据,如果右边没有与之匹配的用None表示

valarr= List(("A",1), ("A",2), ("B",1))

valarr1= List(("A","A1"), ("A","A2"))

valrdd=sc.parallelize(arr,2)

valrdd1=sc.parallelize(arr1,2)

valleftOutJoinRDD=rdd.leftOuterJoin(rdd1)

leftOutJoinRDD.foreach(println)

=>

(B,(1,None))

(A,(1,Some(A1)))

(A,(1,Some(A2)))

(A,(2,Some(A1)))

(A,(2,Some(A2)))

RDD[(K,

(Option[V], W))] rightOuterJoin[W](other:

RDD[(K, W)])

右外连接,包含右RDD的所有数据,如果左边没有与之匹配的用None表示

valarr= List(("A",1), ("A",2))

valarr1= List(("A","A1"), ("A","A2"), ("B",1))

valrdd=sc.parallelize(arr,2)

valrdd1=sc.parallelize(arr1,2)

valleftOutJoinRDD=rdd.rightOuterJoin(rdd1)

leftOutJoinRDD.foreach(println)

(B,(None,1))

(A,(Some(1),A1))

(A,(Some(1),A2))

(A,(Some(2),A1))

(A,(Some(2),A2))

RDD[(K,

(V, W))] join(other: RDD[(K, W))

W:另一RDD元素的value的类型

对两个包含对的RDD根据key进行join操作,返回类型

rdd.join(otherRdd)

{(1, "leo"),(2, "jack"),(3, "tom")}

{(1, 100), (2, 90), (3, 60), (1, 70), (2, 80), (3, 50)}

=>{(1,("leo",100)),(1,("leo",70)),(2,

("jack",90),(2, ("jack",80),(3, ("tom",60),(3,

("tom",50))}

RDD[(K,

(Iterable[V], Iterable[W]))] cogroup(other:

RDD[(K, W)])

同join,也是根据key进行join,只不过相同key的value分别存放到Iterable中

rdd.cogroup(otherRdd)

{(1, "leo"),(2, "jack"),(3, "tom")}

{(1, 100), (2, 90), (3, 60), (1, 70), (2, 80), (3, 50)}

=>{(1,(["leo"],[100,70])),(2,

(["jack"],[90,80])),(3,

(["tom","lily"],[60,50]))}

常用Action

T reduce(f: (T, T) => T)

对所有元素进行reduce操作

rdd.reduce(_

+ _)

{1, 2, 2, 3, 3, 3}

=>14

Array[T]

collect()

将RDD中所有元素返回到一个数组里

注意:This method should only

be used if the resulting array is expected to be small, as all the data is

loaded into the driver's memory.

rdd.collect()

{1, 2, 3, 3}

=>[1,

2, 3, 3]

Map[K,

V] collectAsMap()

作用于K-V类型的RDD上,作用与collect不同的是collectAsMap函数不包含重复的key,对于重复的key,后面的元素覆盖前面的元素

rdd.collectAsMap()

{ ("leo", 65), ("tom", 50), ("tom",

100)}

=>{

("leo", 65), ("tom", 100)}

Long count()

统计RDD中的元素个数

rdd.count()

{1, 2, 3, 3}

=>4

Map[T,

Long] countByValue()

各元素在RDD中出现的次数

注意:This method should only

be used if the resulting map is expected to be small, as the whole thing is

loaded into the driver's memory.

To handle

very large results, consider usingrdd.map(x => (x, 1L)).reduceByKey(_ + _), which

returns anRDD[T, Long]instead of amap.

rdd.countByValue()

{1, 2, 3, 3}

=>Map(1

-> 1, 3 -> 2, 2 -> 1)

Map[K,

Long] countByKey()

先根据Key进行分组,再对每组里的value分别进行计数统计

注意:This method should only

be used if the resulting map is expected to be small, as the whole thing is

loaded into the driver's memory.

To handle

very large results, consider usingrdd.mapValues(_ => 1L).reduceByKey(_ + _), whichreturns

anRDD[T, Long]instead of amap.

{ ("leo", 65), ("tom", 50), ("tom", 100),

("tom", 100) }

=>Map(leo

-> 1, tom -> 3)

T first()

取第一个元素,实质上是调用take(1)实现的

rdd.first()

{3, 2,

1, 4}

=>3

Array[T]

take(num: Int)

从RDD中返回前num个元素

注意:This method should only

be used if the resulting array is expected to be small, as all the data is

loaded into the driver's memory.

rdd.take(2)

{3, 2, 1, 4}

=>[3,

2]

Array[T]

top(num: Int ) (implicit ord:

Ordering[T])

如果没有传递ord参数,则使用隐式参数,且提供的默认隐式参数为升序排序,可以传递一个自定义的Ordering来覆盖默认提供。top实现是将Ordering反序后再调用takeOrdered的:takeOrdered(num)(ord.reverse)

默认从RDD中返回最最大的num个元素

注意:This method should only

be used if the resulting array is expected to be small, as all the data is

loaded into the driver's memory.

rdd.top(2)

{3, 2, 1, 4}

=>[4,

3]

Array[T]

takeOrdered(num: Int)(implicit ord:

Ordering[T])

如果没有传递ord参数,则使用隐式参数,且提供的默认隐式参数为升序排序,可以传递一个自定义的Ordering来覆盖默认提供

与top相反,默认取的是前面最小的num个元素

注意:This method should only

be used if the resulting array is expected to be small, as all the data is

loaded into the driver's memory.

rdd.takeOrdered(2)(myOrdering)

{3, 2, 1, 4}

=>[1,

2]

T fold(zeroValue: T)(op: (T, T) => T)

zeroValue:为每个分区累计的初始值,以及不同分区累计的初始值

e.g., Nilfor list concatenation, 0

for addition, or 1 for multiplication

和reduce()一

样, 但 是 需 要

提供初始值。注意:每个分区应用op函数时,都会以zeroValue为初始值进行计算,然后将每个分区的结果合并时,还是会以zeroValue为初始值进行合并计算

valarr= Array(1,2,3,4,5);

valrdd=sc.parallelize(arr,2)//分成两分区[1,

2] [3, 4, 5]

println(rdd.fold(10)((v1, v2)

=> { println(v1 +" + "+ v2 +" =

"+ (v1 + v2)); v1 + v2 }))

//处理第一个分区数据

10+ 1 = 11

11 + 2

= 13 //从第二个元素起,每分区内先累加

=====================

//处理第一个分区数据

10+ 3 = 13

13 + 4

= 17 //从第二个元素起,每分区内先累加

17 + 5

= 22 //从第二个元素起,每分区内先累加

=====================

//将各分区汇总起来

10+ 13 = 23 //汇总时还会使用初始值来作起始

23 +

22 = 45

45

U aggregate (zeroValue: U)(seqOp: (U, T) => U,

combOp: (U, U) => U)

初始值类型与原始数据类型可以不同,但初始值类型决定了返回值类型

与fold一样,计算时需要提供初始值,不同的是,分区的计算函数(seqOp)与分区合并计算函数(combOp)是不同的,但fold分区计算函数与分区合并计算函数是同一函数

rdd.fold(5)(_

+ _, _ + _)

val

arr = Array(1, 2, 3, 4);

val

rdd = sc.parallelize(arr, 2)

println(rdd.aggregate(5)(

(v1,

v2) => { println("v1 = " + v1 + " ; v2 = " + v2); v1 +

v2 },

(v1,

v2) => { println("v1 = " + v1 + " ; v2 = " + v2); v1 +

v2 })

)

过程与结果与上面的fold函数一样

Unit saveAsTextFile(path: String)

将RDD元素保存到文件中,对每个元素调用toString方法

Unit foreach(f: T => Unit)

遍历RDD中的每个元素

rdd.foreach(println(_))

comineByKey

defcombineByKey[C](

createCombiner: V => C,

mergeValue: (C, V) => C,

mergeCombiners: (C, C) => C,

partitioner: Partitioner,

mapSideCombine: Boolean =true,

serializer: Serializer =null): RDD[(K, C)]

createCombiner:在第一次遇到Key时创建组合器函数,将RDD数据集中的V类型值转换C类型值(V => C),

mergeValue:合并值函数,再次遇到相同的Key时,将createCombiner道理的C类型值与这次传入的V类型值合并成一个C类型值(C,V)=>C

mergeCombiners:合并组合器函数,将C类型值两两合并成一个C类型值

partitioner:使用已有的或自定义的分区函数,默认是HashPartitioner

mapSideCombine:是否在map端进行Combine操作,默认为true

例:统计男性和女生的个数,并以(性别,(名字,名字....),个数)的形式输出

objectCombineByKey {

defmain(args:

Array[String]) {

valconf=newSparkConf().setMaster("local").setAppName("combinByKey")

valsc=newSparkContext(conf)

valpeople= List(("male","Mobin"), ("male","Kpop"), ("female","Lucy"), ("male","Lufei"), ("female","Amy"))

valrdd=sc.parallelize(people)

valcombinByKeyRDD=rdd.combineByKey(

(x: String) => (List(x),1),

(peo: (List[String], Int), x: String) => (x :: peo._1, peo._2+1),

(sex1: (List[String], Int), sex2: (List[String], Int)) => (sex1._1::: sex2._1, sex1._2+ sex2._2))

combinByKeyRDD.foreach(println)

sc.stop()

}

}

输出:

(male,(List(Lufei, Kpop,

Mobin),3))

(female,(List(Amy,

Lucy),2))

计算过程:

Partition1:

K="male"  -->

("male","Mobin")  -->

createCombiner("Mobin") =>  peo1 = (

List("Mobin") , 1 )

K="male"  -->

("male","Kpop")  -->

mergeValue(peo1,"Kpop") =>  peo2 = (

"Kpop"  ::  peo1_1 , 1 + 1 )//Key相同调用mergeValue函数对值进行合并

K="female"  -->

("female","Lucy")  -->

createCombiner("Lucy") =>  peo3 = (

List("Lucy") , 1 )

Partition2:

K="male"  -->

("male","Lufei")  -->

createCombiner("Lufei") =>  peo4 = (  List("Lufei")

, 1 )

K="female"  -->

("female","Amy")  -->

createCombiner("Amy") =>  peo5 = (

List("Amy") , 1 )

Merger Partition:

K="male" --> mergeCombiners(peo2,peo4) =>

(List(Lufei,Kpop,Mobin))

K="female" --> mergeCombiners(peo3,peo5)

=> (List(Amy,Lucy))

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/weixin_39646706/article/details/111483055

智能推荐

彻底扒光 通过智能路由器拆解看其本质-程序员宅基地

文章浏览阅读1.7k次。可以看到很多联发科的MT芯片摘自:https://net.zol.com.cn/531/5312999.html彻底扒光 通过智能路由器拆解看其本质2015-07-23 00:40:00[中关村在线 原创] 作者:陈赫|责编:白宁收藏文章 分享到 评论(24)关注智能路由器拆解的朋友们注意啦!我们已经将这五款产品彻底扒开,将主板的真容展现在了大家的眼前。网友们可以看见这些智能路由器主板的做工和用料,我们还为网友们展示了主要的电子元器件,供大家品评观赏。..._路由器拆解

Java--深入JDK和hotspot底层源码剖析Thread的run()、start()方法执行过程_jdk的源码hotspot跟jdk是分开的-程序员宅基地

文章浏览阅读2.1k次,点赞101次,收藏78次。【学习背景】今天主要是来了解Java线程Thread中的run()、start()两个方法的执行有哪些区别,会给出一个简单的测试代码样例,快速理解两者的区别,再从源码层面去追溯start()底层是如何最终调用Thread#run()方法的,个人觉得这样的学习不论对面试,还是实际编程来说都是比较有帮助的。进入正文~学习目录一、代码测试二、源码分析2.1 run()方法2.2 start()方法三、使用总结一、代码测试执行Thread的run()、start()方法的测试代码如下:public_jdk的源码hotspot跟jdk是分开的

透视俄乌网络战之一:数据擦除软件_俄乌网络战观察(一)-程序员宅基地

文章浏览阅读4.4k次,点赞90次,收藏85次。俄乌冲突中,各方势力通过数据擦除恶意软件破坏关键信息基础设施计算机的数据,达到深度致瘫的效果,同时窃取重要敏感信息。_俄乌网络战观察(一)

Maven私服仓库配置-Nexus详解_nexus maven-程序员宅基地

文章浏览阅读1.7w次,点赞23次,收藏139次。Maven 私服是一种特殊的Maven远程仓库,它是架设在局域网内的仓库服务,用来代理位于外部的远程仓库(中央仓库、其他远程公共仓库)。当然也并不是说私服只能建立在局域网,也有很多公司会直接把私服部署到公网,具体还是得看公司业务的性质是否是保密的等等,因为局域网的话只能在公司用,部署到公网的话员工在家里也可以办公使用。_nexus maven

基于AI的计算机视觉识别在Java项目中的使用 (四) —— 准备训练数据_java ocr ai识别训练-程序员宅基地

文章浏览阅读934次。我先用所有的样本数据对模型做几轮初步训练,让深度神经模型基本拟合(数万条记录的训练集,识别率到99%左右),具备初步的识别能力,这时的模型就是“直男”。相较于训练很多轮、拟合程度很高的“油腻男”,它的拟合程度较低,还是“直男愣头青”。..............._java ocr ai识别训练

hibernate 数据库类型 date没有时分秒解决_hibernate解析時間只有年月日沒有時分秒-程序员宅基地

文章浏览阅读688次。一、问题现象:  在数据库表中日期字段中存的日期光有年月日,没有时分秒。二、产生原因:三 解决办法   检查表的相应映射xml文件。 <property name="operateDate" type="Date">如果同上面所写,那问题出在 type类型上了正确写法 :<property name="operateDate" type="java.util..._hibernate解析時間只有年月日沒有時分秒

随便推点

springbbot运行无法编译成功,找不到jar包报错:Error:(3, 46) java: 程序包org.springframework.context.annotation不存在-程序员宅基地

文章浏览阅读1k次,点赞2次,收藏2次。文章目录问题描述:解决方案:问题描述:提示:idea springbbot运行无法编译成功,找不到jar包报错E:\ideaProject\demokkkk\src\main\java\com\example\demo\config\WebSocketConfig.javaError:(3, 46) java: 程序包org.springframework.context.annotation不存在Error:(4, 46) java: 程序包org.springframework.conte_error:(3, 46) java: 程序包org.springframework.context.annotation不存在

react常见面试题_recate面试-程序员宅基地

文章浏览阅读6.4k次,点赞6次,收藏36次。1、redux中间件中间件提供第三方插件的模式,自定义拦截 action -&gt; reducer 的过程。变为 action -&gt; middlewares -&gt; reducer 。这种机制可以让我们改变数据流,实现如异步 action ,action 过滤,日志输出,异常报告等功能。常见的中间件:redux-logger:提供日志输出redux-thunk:处理异步操作..._recate面试

交叉编译jpeglib遇到的问题-程序员宅基地

文章浏览阅读405次。由于要在开发板中加载libjpeg,不能使用gcc编译的库文件给以使用,需要自己配置使用另外的编译器编译该库文件。/usr/bin/ld:.libs/jaricom.o:RelocationsingenericELF(EM:40)/usr/bin/ld:.libs/jaricom.o:RelocationsingenericELF(EM:40)...._jpeg_utils.lo: relocations in generic elf (em: 8) error adding symbols: file

【办公类-22-06】周计划系列(1)“信息窗” (2024年调整版本)-程序员宅基地

文章浏览阅读578次,点赞10次,收藏17次。【办公类-22-06】周计划系列(1)“信息窗” (2024年调整版本)

SEO优化_百度seo resetful-程序员宅基地

文章浏览阅读309次。SEO全称为Search Engine Optimization,中文解释为搜索引擎优化。一般指通过对网站内部调整优化及站外优化,使网站满足搜索引擎收录排名需求,在搜索引擎中提高关键词排名,从而把精准..._百度seo resetful

回归预测 | Matlab实现HPO-ELM猎食者算法优化极限学习机的数据回归预测_猎食者优化算法-程序员宅基地

文章浏览阅读438次。回归预测 | Matlab实现HPO-ELM猎食者算法优化极限学习机的数据回归预测_猎食者优化算法