rdd 内生分组_04、常用RDD操作整理-程序员宅基地

技术标签: rdd 内生分组  

常用Transformation

注:某些函数只有PairRDD只有,而普通的RDD则没有,比如gropuByKey、reduceByKey、sortByKey、join、cogroup等函数要根据Key进行分组或直接操作

RDD基本转换:

RDD[U]

map(f: T => U)

T:原RDD中元素类型

U:新RDD中元素类型

函数将T元素转换为新的U元素

rdd.map(x

=> x + 1)

{1, 2, 3, 3}

=>{2,

3, 4, 4}

RDD[U]

flatMap(f: T => TraversableOnce[U])

TraversableOnce:集合与迭代器的父类

函数将T元素转换为含有新类型U元素的集合,并将这些集合展平(两层转换成一层)后的元素形成新的RDD

rdd.flatMap(x

=> x.to(3))

{1, 2, 3, 3}

=>{1,

2, 3, 2, 3, 3, 3}

RDD[T]

filter(f: T => Boolean)

函数对每个元素进行过滤,通过的元素形成新的RDD

rdd.filter(x

=> x != 1)

{1, 2, 3, 3}

=>{2,

3, 3}

RDD[T]

distinct()

去重

rdd.distinct()

{1, 2, 3, 3}

=>{1,

2, 3}

RDD[U]

mapPartitions(f: Iterator[T] =>

Iterator[U])

与map一样,只是转换时是以分区为单位,将一个分区所有元素包装成Iterator一次性传入函数进行处理,而不像map函数那样每个元素都会调用一个函数,即这里有几个分区则才调用几次函数

假设有N个元素,有M个分区,那么map的函数的将被调用N次,而mapPartitions被调用M次

valarr= Array(1,2,3,4,5)

valrdd=sc.parallelize(arr,2)

rdd.mapPartitions((it:

Iterator[Int]) => {varl = List[Int]();

it.foreach((e: Int) => l = e *2:: l); l.iterator })

=>{2,4,6,8,10}

RDD[U]

mapPartitionsWithIndex(f: (Int, Iterator[T]) => Iterator[U])

与mapPartitions类似,不同的时函数多了个分区索引的参数

RDD[T]

union(other: RDD[T])

两个RDD并集,包括重复的元素

rdd.union(otherRdd)

{ 1, 2, 2, 3, 3}

{ 3, 4, 5}

=>{1,

2, 2, 3, 3, 3, 4, 5}

RDD[T]

intersection(other: RDD[T])

两个RDD交集

rdd.intersection(otherRdd)

{ 1, 2, 2, 3, 3}

{ 3, 4, 5}

=>{3}

RDD[T]

subtract(other: RDD[T])

两个RDD相减

rdd.subtract(otherRdd)

{ 1, 2, 2, 3, 3}

{ 3, 4, 5}

=>{1,

2, 2}

RDD[(T,

U)] cartesian(other: RDD[U])

两个RDD相减笛卡儿积

rdd.cartesian(otherRdd)

{ 1, 2 }

{ 3, 4}

=>{(1,3),(1,4),(2,3),(2,4)}

RDD[T]

sortBy( f: (T) => K, ascending:

Boolean,numPartitions: Int)

根据转换后的值进行排序,传入的是一个(T) => K转换函数

rdd.sortBy(_._2,

false, 1)

这里根据value进行降序排序

{("leo", 65), ("tom", 50), ("marry", 100),

("jack", 80)}

=>{("marry",

100),("jack", 80),("leo", 65), ("leo", 65)}

RDD[Array[T]]

glom()

将RDD的每个分区中的类型为T的元素转换换数组Array[T]

valarr= Array(1,2,3,4,5)

valrdd=sc.parallelize(arr,2)

valarrRDD=rdd.glom()arrRDD.foreach {

(arr: Array[Int]) => { println("[ "+ arr.mkString("

") +" ]"); } }

=>[ 1 2 ],[ 3 4 5 ]

键-值RDD转换:

RDD[(K,

U)] mapValues[U](f: V => U)

K:key类型

V:value类型

将value转换为新的U元素,Key不变

rdd.mapValues(_

+ 1)

{"class1", 80), ("class2", 70)}

=>{"class1",

81), ("class2", 71)}

RDD[(K,

U)] flatMapValues(f: V =>

TraversableOnce[U])

对[K,V]型数据中的V值flatmap操作

rdd.flatMapValues(_.toCharArray())

{ (1, "ab"), (2, "bc")}

=>{(1,

'a'), (1, 'b'), (2, 'b'), (2, 'c')}

RDD[(K,

Iterable[V])] groupByKey()

根据key进行分组,同一组的元素组成Iterable,并以(key, Iterable)元组类型为元素作为新的RDD返回

rdd.groupByKey()

{("class1", 80), ("class2", 75),

("class1", 90), ("class2", 60)}

=>{("class1",[80,90]),("class2",[75,60])}

RDD[(K,

Iterable[T])] groupBy(f: T => K)

T:原RDD元素类型

K:新RDD中元素Key的类型

根据函数将元素T映射成相应K后,以此K进行分组

rdd.groupBy({

case 1 => 1; case 2 => 2; case "二" => 2 })

{ 1, 2, "二"

}

=>{(1,[1]),(2,[2,

"二"])}

RDD[(K,

V)] reduceByKey(func: (V, V) => V)

先根据key进行分组,再对同一组中的的value进行reduce操作:第一次调用函数时传入的是两个Key所对应的value,从第二次往后,传入的两个参数中的第一个为上次函数计算的结果,第二个参数为其它Key的value

rdd.

reduceByKey(_ + _)

{("class1", 80), ("class2", 75),

("class1", 90), ("class2", 60)}

=>{("class1",

170),("class2", 135)}

RDD[(K,

V)] sortByKey()

根据key的大小进行排序(注:并不是先以Key进行分组,再对组类进行排序,而是直接根据Key的值进行排序)

rdd.sortByKey(false)

{(65, "leo"), (50, "tom"),(100,

"marry"), (85, "jack")}

=>{(100,

"marry"),(85, "jack"),(65, "eo"),(50,

"tom")}

RDD[(K,

V)] foldByKey(zeroValue: V)(func: (V,

V) => V):

zeroValue:每个分区相同Key累计时的初始值,以及不同分区相同Key合并时的初始值

e.g., Nilfor list concatenation, 0

for addition, or 1 for multiplication

对每个value先进行func操作,且funcfoldByKey函数是通过调用函数实现的。

zeroVale:对V进行初始化,实际上是通过CombineByKey的createCombiner实现的V =>

(zeroValue,V),再通过func函数映射成新的值,即func(zeroValue,V)

func: Value将通过func函数按Key值进行合并(实际上是通过CombineByKey的mergeValue,mergeCombiners函数实现的,只不过在这里,这两个函数是相同的)

valpeople= List(("Mobin",1), ("Lucy",2), ("Amy",3), ("Amy",4), ("Lucy",5))

valrdd=sc.parallelize(people,2)

valfoldByKeyRDD=rdd.foldByKey(10)((v1, v2)

=> { println(v1 +" + "+ v2 +" =

"+ (v1 + v2)); v1 + v2 })//先对每个V都加10,再对相同Key的value值相加

foldByKeyRDD.foreach(println)

//处理第一个分区数据

10+ 1 = 11 // ("Mobin",

1)

10+ 2 = 12 // ("Lucy",

2)

=====================

//处理第二个分区数据

10+ 3 = 13 // ("Amy", 3)

13 + 4

= 17 // ("Amy", 4)同分区同Key的Val先合并

10+ 5 = 15 // ("Lucy",

5)

=====================

//将不同分区相同Key的Value合并起来

12 +

15 = 27 // "Lucy"跨分区,所以需合并

(Amy,17)

(Mobin,11)

(Lucy,27)

RDD[(K,

(V, Option[W]))] leftOuterJoin[W](other:

RDD[(K, W)]):

左外连接,包含左RDD的所有数据,如果右边没有与之匹配的用None表示

valarr= List(("A",1), ("A",2), ("B",1))

valarr1= List(("A","A1"), ("A","A2"))

valrdd=sc.parallelize(arr,2)

valrdd1=sc.parallelize(arr1,2)

valleftOutJoinRDD=rdd.leftOuterJoin(rdd1)

leftOutJoinRDD.foreach(println)

=>

(B,(1,None))

(A,(1,Some(A1)))

(A,(1,Some(A2)))

(A,(2,Some(A1)))

(A,(2,Some(A2)))

RDD[(K,

(Option[V], W))] rightOuterJoin[W](other:

RDD[(K, W)])

右外连接,包含右RDD的所有数据,如果左边没有与之匹配的用None表示

valarr= List(("A",1), ("A",2))

valarr1= List(("A","A1"), ("A","A2"), ("B",1))

valrdd=sc.parallelize(arr,2)

valrdd1=sc.parallelize(arr1,2)

valleftOutJoinRDD=rdd.rightOuterJoin(rdd1)

leftOutJoinRDD.foreach(println)

(B,(None,1))

(A,(Some(1),A1))

(A,(Some(1),A2))

(A,(Some(2),A1))

(A,(Some(2),A2))

RDD[(K,

(V, W))] join(other: RDD[(K, W))

W:另一RDD元素的value的类型

对两个包含对的RDD根据key进行join操作,返回类型

rdd.join(otherRdd)

{(1, "leo"),(2, "jack"),(3, "tom")}

{(1, 100), (2, 90), (3, 60), (1, 70), (2, 80), (3, 50)}

=>{(1,("leo",100)),(1,("leo",70)),(2,

("jack",90),(2, ("jack",80),(3, ("tom",60),(3,

("tom",50))}

RDD[(K,

(Iterable[V], Iterable[W]))] cogroup(other:

RDD[(K, W)])

同join,也是根据key进行join,只不过相同key的value分别存放到Iterable中

rdd.cogroup(otherRdd)

{(1, "leo"),(2, "jack"),(3, "tom")}

{(1, 100), (2, 90), (3, 60), (1, 70), (2, 80), (3, 50)}

=>{(1,(["leo"],[100,70])),(2,

(["jack"],[90,80])),(3,

(["tom","lily"],[60,50]))}

常用Action

T reduce(f: (T, T) => T)

对所有元素进行reduce操作

rdd.reduce(_

+ _)

{1, 2, 2, 3, 3, 3}

=>14

Array[T]

collect()

将RDD中所有元素返回到一个数组里

注意:This method should only

be used if the resulting array is expected to be small, as all the data is

loaded into the driver's memory.

rdd.collect()

{1, 2, 3, 3}

=>[1,

2, 3, 3]

Map[K,

V] collectAsMap()

作用于K-V类型的RDD上,作用与collect不同的是collectAsMap函数不包含重复的key,对于重复的key,后面的元素覆盖前面的元素

rdd.collectAsMap()

{ ("leo", 65), ("tom", 50), ("tom",

100)}

=>{

("leo", 65), ("tom", 100)}

Long count()

统计RDD中的元素个数

rdd.count()

{1, 2, 3, 3}

=>4

Map[T,

Long] countByValue()

各元素在RDD中出现的次数

注意:This method should only

be used if the resulting map is expected to be small, as the whole thing is

loaded into the driver's memory.

To handle

very large results, consider usingrdd.map(x => (x, 1L)).reduceByKey(_ + _), which

returns anRDD[T, Long]instead of amap.

rdd.countByValue()

{1, 2, 3, 3}

=>Map(1

-> 1, 3 -> 2, 2 -> 1)

Map[K,

Long] countByKey()

先根据Key进行分组,再对每组里的value分别进行计数统计

注意:This method should only

be used if the resulting map is expected to be small, as the whole thing is

loaded into the driver's memory.

To handle

very large results, consider usingrdd.mapValues(_ => 1L).reduceByKey(_ + _), whichreturns

anRDD[T, Long]instead of amap.

{ ("leo", 65), ("tom", 50), ("tom", 100),

("tom", 100) }

=>Map(leo

-> 1, tom -> 3)

T first()

取第一个元素,实质上是调用take(1)实现的

rdd.first()

{3, 2,

1, 4}

=>3

Array[T]

take(num: Int)

从RDD中返回前num个元素

注意:This method should only

be used if the resulting array is expected to be small, as all the data is

loaded into the driver's memory.

rdd.take(2)

{3, 2, 1, 4}

=>[3,

2]

Array[T]

top(num: Int ) (implicit ord:

Ordering[T])

如果没有传递ord参数,则使用隐式参数,且提供的默认隐式参数为升序排序,可以传递一个自定义的Ordering来覆盖默认提供。top实现是将Ordering反序后再调用takeOrdered的:takeOrdered(num)(ord.reverse)

默认从RDD中返回最最大的num个元素

注意:This method should only

be used if the resulting array is expected to be small, as all the data is

loaded into the driver's memory.

rdd.top(2)

{3, 2, 1, 4}

=>[4,

3]

Array[T]

takeOrdered(num: Int)(implicit ord:

Ordering[T])

如果没有传递ord参数,则使用隐式参数,且提供的默认隐式参数为升序排序,可以传递一个自定义的Ordering来覆盖默认提供

与top相反,默认取的是前面最小的num个元素

注意:This method should only

be used if the resulting array is expected to be small, as all the data is

loaded into the driver's memory.

rdd.takeOrdered(2)(myOrdering)

{3, 2, 1, 4}

=>[1,

2]

T fold(zeroValue: T)(op: (T, T) => T)

zeroValue:为每个分区累计的初始值,以及不同分区累计的初始值

e.g., Nilfor list concatenation, 0

for addition, or 1 for multiplication

和reduce()一

样, 但 是 需 要

提供初始值。注意:每个分区应用op函数时,都会以zeroValue为初始值进行计算,然后将每个分区的结果合并时,还是会以zeroValue为初始值进行合并计算

valarr= Array(1,2,3,4,5);

valrdd=sc.parallelize(arr,2)//分成两分区[1,

2] [3, 4, 5]

println(rdd.fold(10)((v1, v2)

=> { println(v1 +" + "+ v2 +" =

"+ (v1 + v2)); v1 + v2 }))

//处理第一个分区数据

10+ 1 = 11

11 + 2

= 13 //从第二个元素起,每分区内先累加

=====================

//处理第一个分区数据

10+ 3 = 13

13 + 4

= 17 //从第二个元素起,每分区内先累加

17 + 5

= 22 //从第二个元素起,每分区内先累加

=====================

//将各分区汇总起来

10+ 13 = 23 //汇总时还会使用初始值来作起始

23 +

22 = 45

45

U aggregate (zeroValue: U)(seqOp: (U, T) => U,

combOp: (U, U) => U)

初始值类型与原始数据类型可以不同,但初始值类型决定了返回值类型

与fold一样,计算时需要提供初始值,不同的是,分区的计算函数(seqOp)与分区合并计算函数(combOp)是不同的,但fold分区计算函数与分区合并计算函数是同一函数

rdd.fold(5)(_

+ _, _ + _)

val

arr = Array(1, 2, 3, 4);

val

rdd = sc.parallelize(arr, 2)

println(rdd.aggregate(5)(

(v1,

v2) => { println("v1 = " + v1 + " ; v2 = " + v2); v1 +

v2 },

(v1,

v2) => { println("v1 = " + v1 + " ; v2 = " + v2); v1 +

v2 })

)

过程与结果与上面的fold函数一样

Unit saveAsTextFile(path: String)

将RDD元素保存到文件中,对每个元素调用toString方法

Unit foreach(f: T => Unit)

遍历RDD中的每个元素

rdd.foreach(println(_))

comineByKey

defcombineByKey[C](

createCombiner: V => C,

mergeValue: (C, V) => C,

mergeCombiners: (C, C) => C,

partitioner: Partitioner,

mapSideCombine: Boolean =true,

serializer: Serializer =null): RDD[(K, C)]

createCombiner:在第一次遇到Key时创建组合器函数,将RDD数据集中的V类型值转换C类型值(V => C),

mergeValue:合并值函数,再次遇到相同的Key时,将createCombiner道理的C类型值与这次传入的V类型值合并成一个C类型值(C,V)=>C

mergeCombiners:合并组合器函数,将C类型值两两合并成一个C类型值

partitioner:使用已有的或自定义的分区函数,默认是HashPartitioner

mapSideCombine:是否在map端进行Combine操作,默认为true

例:统计男性和女生的个数,并以(性别,(名字,名字....),个数)的形式输出

objectCombineByKey {

defmain(args:

Array[String]) {

valconf=newSparkConf().setMaster("local").setAppName("combinByKey")

valsc=newSparkContext(conf)

valpeople= List(("male","Mobin"), ("male","Kpop"), ("female","Lucy"), ("male","Lufei"), ("female","Amy"))

valrdd=sc.parallelize(people)

valcombinByKeyRDD=rdd.combineByKey(

(x: String) => (List(x),1),

(peo: (List[String], Int), x: String) => (x :: peo._1, peo._2+1),

(sex1: (List[String], Int), sex2: (List[String], Int)) => (sex1._1::: sex2._1, sex1._2+ sex2._2))

combinByKeyRDD.foreach(println)

sc.stop()

}

}

输出:

(male,(List(Lufei, Kpop,

Mobin),3))

(female,(List(Amy,

Lucy),2))

计算过程:

Partition1:

K="male"  -->

("male","Mobin")  -->

createCombiner("Mobin") =>  peo1 = (

List("Mobin") , 1 )

K="male"  -->

("male","Kpop")  -->

mergeValue(peo1,"Kpop") =>  peo2 = (

"Kpop"  ::  peo1_1 , 1 + 1 )//Key相同调用mergeValue函数对值进行合并

K="female"  -->

("female","Lucy")  -->

createCombiner("Lucy") =>  peo3 = (

List("Lucy") , 1 )

Partition2:

K="male"  -->

("male","Lufei")  -->

createCombiner("Lufei") =>  peo4 = (  List("Lufei")

, 1 )

K="female"  -->

("female","Amy")  -->

createCombiner("Amy") =>  peo5 = (

List("Amy") , 1 )

Merger Partition:

K="male" --> mergeCombiners(peo2,peo4) =>

(List(Lufei,Kpop,Mobin))

K="female" --> mergeCombiners(peo3,peo5)

=> (List(Amy,Lucy))

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/weixin_39646706/article/details/111483055

智能推荐

linux devkmem 源码,linux dev/mem dev/kmem实现访问物理/虚拟内存-程序员宅基地

文章浏览阅读451次。dev/mem: 物理内存的全镜像。可以用来访问物理内存。/dev/kmem: kernel看到的虚拟内存的全镜像。可以用来访问kernel的内容。调试嵌入式Linux内核时,可能需要查看某个内核变量的值。/dev/kmem正好提供了访问内核虚拟内存的途径。现在的内核大都默认禁用了/dev/kmem,打开的方法是在 make menuconfig中选中 device drivers --> ..._dev/mem 源码实现

vxe-table 小众但功能齐全的vue表格组件-程序员宅基地

文章浏览阅读7.1k次,点赞2次,收藏19次。vxe-table,一个小众但功能齐全并支持excel操作的vue表格组件_vxe-table

(开发)bable - es6转码-程序员宅基地

文章浏览阅读62次。参考:http://www.ruanyifeng.com/blog/2016/01/babel.htmlBabelBabel是一个广泛使用的转码器,可以将ES6代码转为ES5代码,从而在现有环境执行// 转码前input.map(item => item + 1);// 转码后input.map(function (item) { return item..._让开发环境支持bable

FPGA 视频处理 FIFO 的典型应用_fpga 频分复用 视频-程序员宅基地

文章浏览阅读2.8k次,点赞6次,收藏29次。摘要:FPGA视频处理FIFO的典型应用,视频输入FIFO的作用,视频输出FIFO的作用,视频数据跨时钟域FIFO,视频缩放FIFO的作用_fpga 频分复用 视频

R语言:设置工作路径为当前文件存储路径_r语言设置工作目录到目标文件夹-程序员宅基地

文章浏览阅读575次。【代码】R语言:设置工作路径为当前文件存储路径。_r语言设置工作目录到目标文件夹

background 线性渐变-程序员宅基地

文章浏览阅读452次。格式:background: linear-gradient(direction, color-stop1, color-stop2, ...);<linear-gradient> = linear-gradient([ [ <angle> | to <side-or-corner>] ,]? &l..._background线性渐变

随便推点

【蓝桥杯省赛真题39】python输出最大的数 中小学青少年组蓝桥杯比赛 算法思维python编程省赛真题解析-程序员宅基地

文章浏览阅读1k次,点赞26次,收藏8次。第十三届蓝桥杯青少年组python编程省赛真题一、题目要求(注:input()输入函数的括号中不允许添加任何信息)1、编程实现给定一个正整数N,输出正整数N中各数位最大的那个数字。例如:N=132,则输出3。2、输入输出输入描述:只有一行,输入一个正整数N输出描述:只有一行,输出正整数N中各数位最大的那个数字输入样例:

网络协议的三要素-程序员宅基地

文章浏览阅读2.2k次。一个网络协议主要由以下三个要素组成:1.语法数据与控制信息的结构或格式,包括数据的组织方式、编码方式、信号电平的表示方式等。2.语义即需要发出何种控制信息,完成何种动作,以及做出何种应答,以实现数据交换的协调和差错处理。3.时序即事件实现顺序的详细说明,以实现速率匹配和排序。不完整理解:语法表示长什么样,语义表示能干什么,时序表示排序。转载于:https://blog.51cto.com/98..._网络协议三要素csdn

The Log: What every software engineer should know about real-time data's unifying abstraction-程序员宅基地

文章浏览阅读153次。主要的思想,将所有的系统都可以看作两部分,真正的数据log系统和各种各样的query engine所有的一致性由log系统来保证,其他各种query engine不需要考虑一致性,安全性,只需要不停的从log系统来同步数据,如果数据丢失或crash可以从log系统replay来恢复可以看出kafka系统在linkedin中的重要地位,不光是d..._the log: what every software engineer should know about real-time data's uni

《伟大是熬出来的》冯仑与年轻人闲话人生之一-程序员宅基地

文章浏览阅读746次。伟大是熬出来的  目录  前言  引言 时间熬成伟大:领导者要像狼一样坚忍   第一章 内圣外王——领导者的心态修炼  1. 天纵英才的自信心  2. 上天揽月的企图心  3. 誓不回头的决心  4. 宠辱不惊的平常心  5. 换位思考的同理心  6. 激情四射的热心  第二章 日清日高——领导者的高效能修炼  7. 积极主动,想到做到  8. 合理掌控自己的时间和生命  9. 制定目标,马..._当狼拖着受伤的右腿逃生时,右腿会成为前进的阻碍,它会毫不犹豫撕咬断自己的腿, 以

有源光缆AOC知识百科汇总-程序员宅基地

文章浏览阅读285次。在当今的大数据时代,人们对高速度和高带宽的需求越来越大,迫切希望有一种新型产品来作为高性能计算和数据中心的主要传输媒质,所以有源光缆(AOC)在这种环境下诞生了。有源光缆究竟是什么呢?应用在哪些领域,有什么优势呢?易天将为您解答!有源光缆(Active Optical Cables,简称AOC)是两端装有光收发器件的光纤线缆,主要构成部件分为光路和电路两部分。作为一种高性能计..._aoc 光缆

浏览器代理服务器自动配置脚本设置方法-程序员宅基地

文章浏览阅读2.2k次。在“桌面”上按快捷键“Ctrl+R”,调出“运行”窗口。接着,在“打开”后的输入框中输入“Gpedit.msc”。并按“确定”按钮。如下图 找到“用户配置”下的“Windows设置”下的“Internet Explorer 维护”的“连接”,双击选择“自动浏览器配置”。如下图 选择“自动启动配置”,并在下面的“自动代理URL”中填写相应的PAC文件地址。如下..._設置proxy腳本