企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
| Action | 描述| | --- | --- | | `reduce(func)` | 使用函数 func(它接受两个参数并返回一个)聚合数据集的元素。 | | `collect()` | 在驱动程序(Driver)中以数组的形式返回数据集的所有元素。 | | `count()` | 返回数据集中元素的数量。 | | `first()` | 返回数据集的第一个元素(类似于 take(1))。 | | `take(n)` | 返回一个包含数据集前 n 个元素的数组。 | | `takeSample(withReplacement, num, [seed])` | 返回一个数组,其中包含数据集的随机 num 元素样本,可以替换,也可以不替换,可以预先指定随机数生成器种子。 | | `takeOrdered(n, [ordering])` | 使用 RDD 的自然顺序或自定义比较器返回 RDD 的前 n 个元素。 | | `saveAsTextFile(path)` | 将数据集的元素作为文本文件(或文本文件集)写入本地文件系统、HDFS 或任何其他 hadoop 支持的文件系统的给定目录中。Spark 将对每个元素调用 toString,将其转换为文件中的一行文本。 | | `saveAsSequenceFile(path)` | 将数据集的元素作为 Hadoop SequenceFile 写入本地文件系统、HDFS 或任何其他 Hadoop 支持的文件系统的给定路径中。这在实现 Hadoop 的可写接口的键值对的 RDDs 上是可用的。在 Scala 中,它也可用于隐式转换为可写的类型(Spark 包括对Int、Double、String 等基本类型的转换)。 | | `saveAsObjectFile(path)` | 使用 Java 序列化以简单的格式编写数据集的元素,然后可以使用 SparkContext.objectFile()加载这些元素。 | | `countByKey()` | 仅在类型(K, V)的 RDDs 上可用。返回(K, Int)对的Map 表示每个键的计数。 | | `foreach(func)` | 对数据集的每个元素运行函数 func | |`lookup(key)` | 用于PairRDD,返回K对应的所有V值 | 一些示例: ```scala import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object ActionOps { def main(args: Array[String]): Unit = { val conf:SparkConf = new SparkConf().setAppName(this.getClass.getName) .setMaster("local[4]") val sc:SparkContext = new SparkContext(conf) val rdd1:RDD[Int] = sc.parallelize(1 to 10) // 统计元素个数 println(rdd1.count()) // 10 // 收集RDD的所有数据,并遍历输出 rdd1.collect().foreach(x => print(s"$x ")) // 1 2 3 4 5 6 7 8 9 10 // 取出前5个元素 rdd1.take(5).foreach(x => print(s"$x ")) // 1 2 3 4 5 // 取出第一个元素 println(rdd1.first()) // 1 val words: RDD[String] = sc.parallelize(List("hello wrold hadoop spark ","spark python java hello")) println(words.flatMap(_.split("\\s+")).map((_, 1)).reduce((x, y) => (x._1, x._2 + y._2))) // (spark,8), 这里就是统计一共有多少个单词 // 获取最值与求和 println(rdd1.max()) // 10 println(rdd1.min) // 1 println(rdd1.sum()) // 55.0 // 将RDD输出到文件中,文件目录不能已经存在 // 如果要保持到hdfs上:hdfs://hadoop101:9000/data // 默认4个分区,所以会生成4个part-文件 words.saveAsTextFile("file:///E:\\hadoop\\output") // lookup val rdd2:RDD[(Char, Int)] = sc.parallelize(List(('a',1), ('a',2), ('b',3), ('c',4))) println(rdd2.lookup('a')) // WrappedArray(1, 2) sc.stop() // 释放资源 } } ```