企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
## 一、常用的转换算子 需要操作的Transformation算子说明如下: ### 1.1. map map(func)返回一个新的分布式数据集,由每个原元素经过func函数转换后组成 ``` object _02SparkTransformationOps { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName(_02SparkTransformationOps.getClass.getSimpleName) Logger.getLogger("org.apache.spark").setLevel(Level.OFF) val sc = new SparkContext(conf) transformationOps1(sc) sc.stop() } /** * 1、map:将集合中每个元素乘以7 * map(func):返回一个新的分布式数据集,由每个原元素经过func函数转换后组成 */ def transformationOps1(sc:SparkContext): Unit = { val list = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) val listRDD = sc.parallelize(list) val retRDD = listRDD.map(num => num * 7) retRDD.foreach(num => println(num)) } } ``` 输出结果 ``` 42 7 49 14 56 21 63 28 70 35 ``` ### 1.2. filter filter(func)返回一个新的数据集,由经过func函数后返回值为true的原元素组成 ``` object _02SparkTransformationOps { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName(_02SparkTransformationOps.getClass.getSimpleName) Logger.getLogger("org.apache.spark").setLevel(Level.OFF) val sc = new SparkContext(conf) transformationOps2(sc) sc.stop() } /** * 2、filter:过滤出集合中的奇数 * filter(func): 返回一个新的数据集,由经过func函数后返回值为true的原元素组成 * * 一般在filter操作之后都要做重新分区(因为可能数据量减少了很多) */ def transformationOps2(sc:SparkContext): Unit = { val list = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) val listRDD = sc.parallelize(list) val retRDD = listRDD.filter(num => num % 2 == 0) retRDD.foreach(println) } } ``` 输出结果 ``` 6 2 8 4 10 ``` ### 1.3.flatMap flatMap(func)类似于map,但是每一个输入元素,会被映射为0到多个输出元素(因此,func函数的返回值是一个Seq,而不是单一元素) ``` object _02SparkTransformationOps { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName(_02SparkTransformationOps.getClass.getSimpleName) Logger.getLogger("org.apache.spark").setLevel(Level.OFF) val sc = new SparkContext(conf) transformationOps3(sc) sc.stop() } /** * 3、flatMap:将行拆分为单词 * flatMap(func):类似于map,但是每一个输入元素, * 会被映射为0到多个输出元素(因此,func函数的返回值是一个Seq,而不是单一元素) */ def transformationOps3(sc:SparkContext): Unit = { val list = List("hello you", "hello he", "hello me") val listRDD = sc.parallelize(list) val wordsRDD = listRDD.flatMap(line => line.split(" ")) wordsRDD.foreach(println) } } ``` 输出结果 ``` hello hello he you hello mes ``` ### 1.4. sample sample(withReplacement, frac, seed)根据给定的随机种子seed,随机抽样出数量为frac的数据. ``` object _02SparkTransformationOps { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName(_02SparkTransformationOps.getClass.getSimpleName) Logger.getLogger("org.apache.spark").setLevel(Level.OFF) val sc = new SparkContext(conf) transformationOps4(sc) sc.stop() } /** * 4、sample:根据给定的随机种子seed,随机抽样出数量为frac的数据 * sample(withReplacement, frac, seed): 根据给定的随机种子seed,随机抽样出数量为frac的数据 * 抽样的目的:就是以样本评估整体 * withReplacement: * true:有放回的抽样 * false:无放回的抽样 * frac:就是样本空间的大小,以百分比小数的形式出现,比如20%,就是0.2 * * 使用sample算子计算出来的结果可能不是很准确,1000个数,20%,样本数量在200个左右,不一定为200 * * 一般情况下,使用sample算子在做spark优化(数据倾斜)的方面应用最广泛 */ def transformationOps4(sc:SparkContext): Unit = { val list = 1 to 1000 val listRDD = sc.parallelize(list) val sampleRDD = listRDD.sample(false, 0.2) sampleRDD.foreach(num => print(num + " ")) println println("sampleRDD count: " + sampleRDD.count()) println("Another sampleRDD count: " + sc.parallelize(list).sample(false, 0.2).count()) } } ``` 输出结果 ``` sampleRDD count: 219 Another sampleRDD count: 203 ``` ### 1.5.union union(otherDataset)返回一个新的数据集,由原数据集和参数联合而成 ``` object _02SparkTransformationOps { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName(_02SparkTransformationOps.getClass.getSimpleName) Logger.getLogger("org.apache.spark").setLevel(Level.OFF) val sc = new SparkContext(conf) transformationOps5(sc) sc.stop() } /** * 5、union:返回一个新的数据集,由原数据集和参数联合而成 * union(otherDataset): 返回一个新的数据集,由原数据集和参数联合而成 * 类似数学中的并集,就是sql中的union操作,将两个集合的所有元素整合在一块,包括重复元素 */ def transformationOps5(sc:SparkContext): Unit = { val list1 = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) val list2 = List(7, 8, 9, 10, 11, 12) val listRDD1 = sc.parallelize(list1) val listRDD2 = sc.parallelize(list2) val unionRDD = listRDD1.union(listRDD2) unionRDD.foreach(println) } } ``` 输出结果 ``` object _02SparkTransformationOps { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName(_02SparkTransformationOps.getClass.getSimpleName) Logger.getLogger("org.apache.spark").setLevel(Level.OFF) val sc = new SparkContext(conf) transformationOps5(sc) sc.stop() } /** * 5、union:返回一个新的数据集,由原数据集和参数联合而成 * union(otherDataset): 返回一个新的数据集,由原数据集和参数联合而成 * 类似数学中的并集,就是sql中的union操作,将两个集合的所有元素整合在一块,包括重复元素 */ def transformationOps5(sc:SparkContext): Unit = { val list1 = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) val list2 = List(7, 8, 9, 10, 11, 12) val listRDD1 = sc.parallelize(list1) val listRDD2 = sc.parallelize(list2) val unionRDD = listRDD1.union(listRDD2) unionRDD.foreach(println) } } ``` 输出结果 ``` 1 6 2 7 3 8 4 9 5 10 7 8 9 10 11 12 ``` ### 1.6. groupByKey groupByKey([numTasks])在一个由(K,V)对组成的数据集上调用,返回一个(K,Seq[V])对的数据集。注意:默认情况下,使用8个并行任务进行分组,你可以传入numTask可选参数,根据数据量设置不同数目的Task ``` object _02SparkTransformationOps { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName(_02SparkTransformationOps.getClass.getSimpleName) Logger.getLogger("org.apache.spark").setLevel(Level.OFF) val sc = new SparkContext(conf) transformationOps6(sc) sc.stop() } /** * 6、groupByKey:对数组进行 group by key操作 * groupByKey([numTasks]): 在一个由(K,V)对组成的数据集上调用,返回一个(K,Seq[V])对的数据集。 * 注意:默认情况下,使用8个并行任务进行分组,你可以传入numTask可选参数,根据数据量设置不同数目的Task * mr中: * <k1, v1>--->map操作---><k2, v2>--->shuffle---><k2, [v21, v22, v23...]>---><k3, v3> * groupByKey类似于shuffle操作 * * 和reduceByKey有点类似,但是有区别,reduceByKey有本地的规约,而groupByKey没有本地规约,所以一般情况下, * 尽量慎用groupByKey,如果一定要用的话,可以自定义一个groupByKey,在自定义的gbk中添加本地预聚合操作 */ def transformationOps6(sc:SparkContext): Unit = { val list = List("hello you", "hello he", "hello me") val listRDD = sc.parallelize(list) val wordsRDD = listRDD.flatMap(line => line.split(" ")) val pairsRDD:RDD[(String, Int)] = wordsRDD.map(word => (word, 1)) pairsRDD.foreach(println) val gbkRDD:RDD[(String, Iterable[Int])] = pairsRDD.groupByKey() println("=============================================") gbkRDD.foreach(t => println(t._1 + "..." + t._2)) } } ``` 输出结果 ``` (hello,1) (hello,1) (you,1) (he,1) (hello,1) (me,1) ============================================= you...CompactBuffer(1) hello...CompactBuffer(1, 1, 1) he...CompactBuffer(1) me...CompactBuffer(1) ``` ### 1.7. reduceByKey reduceByKey(func, [numTasks])在一个(K,V)对的数据集上使用,返回一个(K,V)对的数据集,key相同的值,都被使用指定的reduce函数聚合到一起。和groupbykey类似,任务的个数是可以通过第二个可选参数来配置的。 ``` object _02SparkTransformationOps { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName(_02SparkTransformationOps.getClass.getSimpleName) Logger.getLogger("org.apache.spark").setLevel(Level.OFF) val sc = new SparkContext(conf) transformationOps7(sc) sc.stop() } /** * 7、reduceByKey:统计每个班级的人数 * reduceByKey(func, [numTasks]): 在一个(K,V)对的数据集上使用,返回一个(K,V)对的数据集, * key相同的值,都被使用指定的reduce函数聚合到一起。和groupbykey类似,任务的个数是可以通过第二个可选参数来配置的。 * * 需要注意的是还有一个reduce的操作,其为action算子,并且其返回的结果只有一个,而不是一个数据集 * 而reduceByKey是一个transformation算子,其返回的结果是一个数据集 */ def transformationOps7(sc:SparkContext): Unit = { val list = List("hello you", "hello he", "hello me") val listRDD = sc.parallelize(list) val wordsRDD = listRDD.flatMap(line => line.split(" ")) val pairsRDD:RDD[(String, Int)] = wordsRDD.map(word => (word, 1)) val retRDD:RDD[(String, Int)] = pairsRDD.reduceByKey((v1, v2) => v1 + v2) retRDD.foreach(t => println(t._1 + "..." + t._2)) } } ``` 输出结果如下: ``` you...1 hello...3 he...1 me...1 ``` ### 1.8. join双流融合 join(otherDataset, [numTasks])在类型为(K,V)和(K,W)类型的数据集上调用,返回一个(K,(V,W))对,每个key中的所有元素都在一起的数据集 ``` object _02SparkTransformationOps { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName(_02SparkTransformationOps.getClass.getSimpleName) Logger.getLogger("org.apache.spark").setLevel(Level.OFF) val sc = new SparkContext(conf) transformationOps8(sc) sc.stop() } /** * 8、join:打印关联的组合信息 * join(otherDataset, [numTasks]): 在类型为(K,V)和(K,W)类型的数据集上调用,返回一个(K,(V,W))对,每个key中的所有元素都在一起的数据集 * 学生基础信息表和学生考试成绩表 * stu_info(sid ,name, birthday, class) * stu_score(sid, chinese, english, math) * * * Serialization stack: - object not serializable 这种分布式计算的过程,一个非常重要的点,传递的数据必须要序列化 通过代码测试,该join是等值连接(inner join) A.leftOuterJoin(B) A表所有的数据都包涵,B表中在A表没有关联的数据,显示为null 之后执行一次filter就是join的结果 */ def transformationOps8(sc: SparkContext): Unit = { val infoList = List( "1,钟 潇,1988-02-04,bigdata", "2,刘向前,1989-03-24,linux", "3,包维宁,1984-06-16,oracle") val scoreList = List( "1,50,21,61", "2,60,60,61", "3,62,90,81", "4,72,80,81" ) val infoRDD:RDD[String] = sc.parallelize(infoList) val scoreRDD:RDD[String] = sc.parallelize(scoreList) val infoPairRDD:RDD[(String, Student)] = infoRDD.map(line => { val fields = line.split(",") val student = new Student(fields(0), fields(1), fields(2), fields(3)) (fields(0), student) }) val scorePairRDD:RDD[(String, Score)] = scoreRDD.map(line => { val fields = line.split(",") val score = new Score(fields(0), fields(1).toFloat, fields(2).toFloat, fields(3).toFloat) (fields(0), score) }) val joinedRDD:RDD[(String, (Student, Score))] = infoPairRDD.join(scorePairRDD) joinedRDD.foreach(t => { val sid = t._1 val student = t._2._1 val score = t._2._2 println(sid + "\t" + student + "\t" + score) }) println("=========================================") val leftOuterRDD:RDD[(String, (Score, Option[Student]))] = scorePairRDD.leftOuterJoin(infoPairRDD) leftOuterRDD.foreach(println) } } ``` 输出结果如下: ``` 3 3 包维宁 1984-06-16 oracle 3 62.0 90.0 81.0 2 2 刘向前 1989-03-24 linux 2 60.0 60.0 61.0 1 1 钟 潇 1988-02-04 bigdata 1 50.0 21.0 61.0 ========================================= (4,(4 72.0 80.0 81.0,None)) (3,(3 62.0 90.0 81.0,Some(3 包维宁 1984-06-16 oracle))) (2,(2 60.0 60.0 61.0,Some(2 刘向前 1989-03-24 linux))) (1,(1 50.0 21.0 61.0,Some(1 钟 潇 1988-02-04 bigdata))) ``` ### 1.9.sortByKey 测试代码如下: ~~~ object _02SparkTransformationOps { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName(_02SparkTransformationOps.getClass.getSimpleName) Logger.getLogger("org.apache.spark").setLevel(Level.OFF) val sc = new SparkContext(conf) transformationOps7(sc) sc.stop() } /** * sortByKey:将学生身高进行(降序)排序 * 身高相等,按照年龄排(升序) */ def transformationOps9(sc: SparkContext): Unit = { val list = List( "1,李 磊,22,175", "2,刘银鹏,23,175", "3,齐彦鹏,22,180", "4,杨 柳,22,168", "5,敦 鹏,20,175" ) val listRDD:RDD[String] = sc.parallelize(list) /* // 使用sortBy操作完成排序 val retRDD:RDD[String] = listRDD.sortBy(line => line, numPartitions = 1)(new Ordering[String] { override def compare(x: String, y: String): Int = { val xFields = x.split(",") val yFields = y.split(",") val xHgiht = xFields(3).toFloat val yHgiht = yFields(3).toFloat val xAge = xFields(2).toFloat val yAge = yFields(2).toFloat var ret = yHgiht.compareTo(xHgiht) if (ret == 0) { ret = xAge.compareTo(yAge) } ret } } ,ClassTag.Object.asInstanceOf[ClassTag[String]]) */ // 使用sortByKey完成操作,只做身高降序排序 val heightRDD:RDD[(String, String)] = listRDD.map(line => { val fields = line.split(",") (fields(3), line) }) val retRDD:RDD[(String, String)] = heightRDD.sortByKey(ascending = false, numPartitions = 1) // 需要设置1个分区,否则只是各分区内有序 retRDD.foreach(println) // 使用sortByKey如何实现sortBy的二次排序?将上面的信息写成一个java对象,然后重写compareTo方法,在做map时,key就为该对象本身,而value可以为null } } ~~~ 输出结果如下: ~~~ (180,3,齐彦鹏,22,180) (175,1,李 磊,22,175) (175,2,刘银鹏,23,175) (175,5,敦 鹏,20,175) (168,4,杨 柳,22,168) ~~~ 下面是一个快速入门的 demo: ~~~ scala> val rdd = sc.parallelize(Seq((1,"one"),(2,"two"),(3,"three"))) rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[10] at parallelize at <console>:21 scala> rdd.sortByKey(true, 1).foreach(println) (1,one) (2,two) (3,three) ~~~ ### 1.10.combineByKey 与 aggregateByKey 下面的代码分别使用 combineByKey 和 aggregateByKey 来模拟 groupByKey 和 reduceBykey,所以是有 4 个操作,只要把 combineByKey 模拟 groupByKey 的例子掌握了,其它三个相对就容易许多了。 ~~~ /** * spark的transformation操作: * aggregateByKey * combineByKey * * 使用combineByKey和aggregateByKey模拟groupByKey和reduceByKey * * 通过查看源码,我们发现aggregateByKey底层,还是combineByKey * * 问题:combineByKey和aggregateByKey的区别? * aggregateByKey是柯里化形式的,目前底层源码还没时间去分析,所知道的区别是这个 */ object _03SparkTransformationOps { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName(_03SparkTransformationOps.getClass.getSimpleName) Logger.getLogger("org.apache.spark").setLevel(Level.OFF) val sc = new SparkContext(conf) // combineByKey2GroupByKey(sc) // combineByKey2ReduceByKey(sc) // aggregateByKey2ReduceByKey(sc) aggregateByKey2GroupByKey(sc) sc.stop() } /** * 使用aggregateByKey模拟groupByKey */ def aggregateByKey2GroupByKey(sc: SparkContext): Unit = { val list = List("hello bo bo", "zhou xin xin", "hello song bo") val lineRDD = sc.parallelize(list) val wordsRDD = lineRDD.flatMap(line => line.split(" ")) val pairsRDD = wordsRDD.map(word => (word, 1)) val retRDD:RDD[(String, ArrayBuffer[Int])] = pairsRDD.aggregateByKey(ArrayBuffer[Int]()) ( // 这里需要指定value的类型为ArrayBuffer[Int]() (part, num) => { part.append(num) part }, (part1, part2) => { part1.++=(part2) part1 } ) retRDD.foreach(println) } /** * 使用aggregateByKey模拟reduceByKey * def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)] (zeroValue: U)就对应的是combineByKey中的第一个函数的返回值 seqOp 就对应的是combineByKey中的第二个函数,也就是mergeValue combOp 就对应的是combineByKey中的第三个函数,也就是mergeCombiners */ def aggregateByKey2ReduceByKey(sc:SparkContext): Unit = { val list = List("hello bo bo", "zhou xin xin", "hello song bo") val lineRDD = sc.parallelize(list) val wordsRDD = lineRDD.flatMap(line => line.split(" ")) val pairsRDD = wordsRDD.map(word => (word, 1)) val retRDD:RDD[(String, Int)] = pairsRDD.aggregateByKey(0) ( (partNum, num) => partNum + num, // 也就是mergeValue (partNum1, partNum2) => partNum1 + partNum2 // 也就是mergeCombiners ) retRDD.foreach(println) } /** * 使用reduceByKey模拟groupByKey */ def combineByKey2ReduceByKey(sc:SparkContext): Unit = { val list = List("hello bo bo", "zhou xin xin", "hello song bo") val lineRDD = sc.parallelize(list) val wordsRDD = lineRDD.flatMap(line => line.split(" ")) val pairsRDD = wordsRDD.map(word => (word, 1)) /** * 对于createCombiner1 mergeValue1 mergeCombiners1 * 代码的参数已经体现得很清楚了,其实只要理解了combineByKey模拟groupByKey的例子,这个就非常容易了 */ var retRDD:RDD[(String, Int)] = pairsRDD.combineByKey(createCombiner1, mergeValue1, mergeCombiners1) retRDD.foreach(println) } /** * reduceByKey操作,value就是该数值本身,则上面的数据会产生: * (hello, 1) (bo, 1) (bo, 1) * (zhou, 1) (xin, 1) (xin, 1) * (hello, 1) (song, 1) (bo, 1) * 注意有别于groupByKey的操作,它是创建一个容器 */ def createCombiner1(num:Int):Int = { num } /** * 同一partition内,对于有相同key的,这里的mergeValue直接将其value相加 * 注意有别于groupByKey的操作,它是添加到value到一个容器中 */ def mergeValue1(localNum1:Int, localNum2:Int): Int = { localNum1 + localNum2 } /** * 将两个不同partition中的key相同的value值相加起来 * 注意有别于groupByKey的操作,它是合并两个容器 */ def mergeCombiners1(thisPartitionNum1:Int, anotherPartitionNum2:Int):Int = { thisPartitionNum1 + anotherPartitionNum2 } /** * 使用combineByKey模拟groupByKey */ def combineByKey2GroupByKey(sc:SparkContext): Unit = { val list = List("hello bo bo", "zhou xin xin", "hello song bo") val lineRDD = sc.parallelize(list) val wordsRDD = lineRDD.flatMap(line => line.split(" ")) val pairsRDD = wordsRDD.map(word => (word, 1)) // 输出每个partition中的map对 pairsRDD.foreachPartition( partition => { println("<=========partition-start=========>") partition.foreach(println) println("<=========partition-end=========>") }) val gbkRDD:RDD[(String, ArrayBuffer[Int])] = pairsRDD.combineByKey(createCombiner, mergeValue, mergeCombiners) gbkRDD.foreach(println) // 如果要测试最后groupByKey的结果是在几个分区,可以使用下面的代码进行测试 /*gbkRDD.foreachPartition(partition => { println("~~~~~~~~~~~~~~~~~~~~~~~~~~~") partition.foreach(println) })*/ } /** * 初始化,将value转变成为标准的格式数据 * 是在每个分区中进行的操作,去重后的key有几个,就调用次, * 因为对于每个key,其容器创建一次就ok了,之后有key相同的,只需要执行mergeValue到已经创建的容器中即可 */ def createCombiner(num:Int):ArrayBuffer[Int] = { println("----------createCombiner----------") ArrayBuffer[Int](num) } /** * 将key相同的value,添加到createCombiner函数创建的ArrayBuffer容器中 * 一个分区内的聚合操作,将一个分区内key相同的数据,合并 */ def mergeValue(ab:ArrayBuffer[Int], num:Int):ArrayBuffer[Int] = { println("----------mergeValue----------") ab.append(num) ab } /** * 将key相同的多个value数组,进行整合 * 分区间的合并操作 */ def mergeCombiners(ab1:ArrayBuffer[Int], ab2:ArrayBuffer[Int]):ArrayBuffer[Int] = { println("----------mergeCombiners----------") ab1 ++= ab2 ab1 } } ~~~ 输出结果如下: ~~~ /* combineByKey模拟groupByKey的一个输出效果,可以很好地说明createCombiner、mergeValue和mergeCombiners各个阶段的执行时机: <=========partition-start=========> <=========partition-start=========> (hello,1) (zhou,1) (bo,1) (xin,1) (bo,1) (xin,1) <=========partition-end=========> (hello,1) (song,1) (bo,1) <=========partition-end=========> ----------createCombiner---------- ----------createCombiner---------- ----------createCombiner---------- ----------createCombiner---------- ----------mergeValue---------- ----------mergeValue---------- ----------createCombiner---------- ----------createCombiner---------- ----------createCombiner---------- ----------mergeCombiners---------- ----------mergeCombiners---------- (song,ArrayBuffer(1)) (hello,ArrayBuffer(1, 1)) (bo,ArrayBuffer(1, 1, 1)) (zhou,ArrayBuffer(1)) (xin,ArrayBuffer(1, 1)) */ ~~~ ## 二、Transformations 下面的表格列了 Spark 支持的一些常用 transformations。详细内容请参阅 RDD API 文档([Scala](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD), [Java](https://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/api/java/JavaRDD.html), [Python](https://spark.apache.org/docs/latest/api/python/pyspark.rdd.RDD-class.html)) 和 PairRDDFunctions 文档([Scala](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions), [Java](https://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/api/java/JavaPairRDD.html))。 **Transformation 转换算子**: | 转换算子 | 含义 | | ---------------------------------------------------- | ------------------------------------------------------------ | | **map**(func) | 返回一个新的 RDD,该 RDD 由每一个输入元素经过 func 函数转换后组成 | | **filter**(func) | 返回一个新的 RDD,该 RDD 由经过 func 函数计算后返回值为 true 的输入元素组成 | | **flatMap**(func) | 类似于 map,但是每一个输入元素可以被映射为 0 或多个输出元素 (所以 func 应该返回一个序列,而不是单一元素) | | **mapPartitions**(func) | 类似于 map,但独立地在 RDD 的每一个分片上运行,因此在类型为 T 的 RDD 上运行时,func 的函数类型必须是 Iterator[T] => Iterator[U] | | **mapPartitionsWithIndex**(func) | 类似于 mapPartitions,但 func 带有一个整数参数表示分片的索引值,因此在类型为 T 的 RDD 上运行时,func 的函数类型必须是 (Int, Interator[T]) => Iterator[U] | | sample(withReplacement, fraction, seed) | 根据 fraction 指定的比例对数据进行采样,可以选择是否使用随机数进行替换,seed 用于指定随机数生成器种子 | | **union**(otherDataset) | 对源 RDD 和参数 RDD 求并集后返回一个新的 RDD | | intersection(otherDataset) | 对源 RDD 和参数 RDD 求交集后返回一个新的 RDD | | **distinct**([numTasks])) | 对源 RDD 进行去重后返回一个新的 RDD | | **groupByKey**([numTasks]) | 在一个 (K,V) 的 RDD 上调用,返回一个 (K, Iterator[V]) 的 RDD | | **reduceByKey**(func, [numTasks]) | 在一个 (K,V) 的 RDD 上调用,返回一个 (K,V) 的 RDD,使用指定的 reduce 函数,将相同 key 的值聚合到一起,与 groupByKey 类似,reduce 任务的个数可以通过第二个可选的参数来设置 | | aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) | 对 PairRDD 中相同的 Key 值进行聚合操作,在聚合过程中同样使用了一个中立的初始值。和 aggregate 函数类似,aggregateByKey 返回值的类型不需要和 RDD 中 value 的类型一致 | | **sortByKey**([ascending], [numTasks]) | 在一个 (K,V) 的 RDD 上调用,K 必须实现 Ordered 接口,返回一个按照 key 进行排序的 (K,V) 的 RDD | | sortBy(func,[ascending], [numTasks]) | 与 sortByKey 类似,但是更灵活 | | **join**(otherDataset, [numTasks]) | 在类型为 (K,V) 和(K,W)的 RDD 上调用,返回一个相同 key 对应的所有元素对在一起的 (K,(V,W)) 的 RDD | | cogroup(otherDataset, [numTasks]) | 在类型为 (K,V) 和(K,W)的 RDD 上调用,返回一个 (K,(Iterable,Iterable)) 类型的 RDD | | cartesian(otherDataset) | 笛卡尔积 | | pipe(command, [envVars]) | 对 rdd 进行管道操作 | | **coalesce**(numPartitions) | 减少 RDD 的分区数到指定值。在过滤大量数据之后,可以执行此操作 | | **repartition**(numPartitions) | 重新给 RDD 分区 |