🔥码云GVP开源项目 12k star Uniapp+ElementUI 功能强大 支持多语言、二开方便! 广告
与 RDDs 类似,转换允许修改输入 DStream 中的数据。DStreams 支持许多在普通 Spark RDD 上可用的转换。一些常见的转换操作定义如下。 <br/> **map(func):** 通过函数 func 传递源 DStream 的每个元素来返回一个新的DStream。 **flatMap(func):** 与 map 类似,但是每个输入项可以映射到 0 或多个输出项。 **filter(func):** 通过只选择 func 返回 true 的源 DStream 的记录来返回一个新的 DStream。 **repartition(numPartitions):** 通过创建更多或更少的分区来改变 DStream 中的并行度。 **union(otherStream):** 返回一个新的 DStream,它包含源 DStream 和otherDStream 中元素的并集。 **count():** 通过计算源 DStream 的每个 RDD 中的元素数量,返回一个新的单元素 RDDs DStream。 **reduce(func):** 通过使用函数 func(接受两个参数并返回一个参数)聚合源DStream 的每个 RDD 中的元素,返回一个新的单元素 RDDs DStream。这个函数应该是结合律和交换律,这样才能并行计算。 **countByValue():** 当对类型为 K 的元素的 DStream 调用时,返回一个新的 DStream (K,Long)对,其中每个键的值是它在源 DStream 的每个 RDD中的频率。 **reduceByKey(func, [numTasks]):** 当在(K, V)对的 DStream 上调用时,返回一个新的(K, V)对的 DStream,其中每个键的值使用给定的 reduce 函数进行聚合。注意:在默认情况下,这将使用 Spark 的默认并行任务数(本地模式为 2,而在集群模式下,该数量由配置属性 spark.default.parallelism决定)来进行分组。可以传递一个可选的 numTasks 参数来设置不同数量的任务。 **join(otherStream, [numTasks]):** 当调用两个 DStream (K, V)和(K, W)对时,返回一个新的 DStream (K,(V, W))对,每个键的所有对的元素。 **cogroup(otherStream, [numTasks]):** 当调用(K, V)和(K, W)对的 DStream 时,返回一个新的(K, Seq[V],Seq[W])元组 DStream。 **transform(func):** 通过对源 DStream 的每个 RDD 应用一个 RDD-to-RDD函数来返回一个新的 DStream。这可以用来在 DStream 上执行任意的RDD 操作。 **updateStateByKey(func):** 返回一个新的“状态”DStream,其中通过对键的前一个状态和键的新值应用给定的函数来更新每个键的状态。这可以用来维护每个键的任意状态数据。 <br/> 相比RDD转换,DStream有两个特殊操作:updateStateByKey操作和window操作。 <br/> # 1. updateStateByKey 操作 updateStateByKey 操作允许维护任意状态,同时不断地用新信息更新它。要使用它,必须执行两个步骤。 (1)定义状态——状态可以是任意的数据类型。 (2)定义状态更新函数——使用一个函数指定如何使用输入流中的前一个状态和新值来更新状态。 <br/> 在每个批处理中,Spark 将对所有现有 keys 应用<mark>状态更新</mark>功能,而不管它们在批处理中是否有新数据。如果更新函数返回 None,则键值对将被删除。 <br/> 例如:需要维护在整个文本数据流中看到的每个单词的运行计数。这里,运行计数是状态,它是一个整数。将更新函数定义如下: ```scala // 将更新函数 def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = { val newCount = ... //使用前一个运行的计数添加新值以获得新计数 Some(newCount) } // 函数调用示例: val runningCounts = pairs.updateStateByKey[Int](updateFunction _) ``` <br/> 完整代码: ```scala import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} /** * 使用 Spark Streaming 处理有状态的数据 */ object StatefulWordCount { def main(args: Array[String]) { val sparkConf = new SparkConf sparkConf.setAppName("StatefulWordCount").setMaster("local[2]") val ssc = new StreamingContext(sparkConf, Seconds(5)) ssc.checkpoint(".") val lines = ssc.socketTextStream("localhost", 6789) val result = lines.flatMap(_.split(" ")).map((_, 1)) val state = result.updateStateByKey(updateFunction) state.print() ssc.start() ssc.awaitTermination() } def updateFunction(currentValues: Seq[Int], preValues: Option[Int]): Option[Int] = { val curr = currentValues.sum val pre = preValues.getOrElse(0) Some(curr + pre) } } ``` <br/> # 2. window操作 Spark Streaming 还提供了窗口计算,它允许在数据的滑动窗口上应用转换。下图演示了这个滑动窗口。 :-: ![](https://img.kancloud.cn/8c/3c/8c3c7d032f06fb980f7980296a3caf26_1137x342.jpg) 滑动窗口 <br/> 如图所示,每当窗口在源 DStream 上滑动时,位于窗口内的源 RDDs 就会被合并并操作,以生成窗口化的 DStream 的 RDDs。在本例中,操作应用于数据的最后 3 个时间单位,幻灯片应用于 2 个时间单位。这表明任何窗口操作都需要指定两个参数。 * 窗口长度—窗口的持续时间(图中为 3)。 * 滑动间隔—窗口操作执行的间隔(图中为 2)。 这两个参数必须是源 DStream 的批处理间隔的倍数(图中为 1)。 <br/> 例如,希望通过每 10 秒在最后 30 秒的数据中生成单词计数来扩展前面的示例。为此,必须在最后 30 秒的数据中对(word, 1)的 DStream 应用 reduceByKey操作,可以使用 reduceByKeyAndWindow 操作完成。 ```scala //每 10 秒对最后 30 秒的数据进行 reduceByKey val windowedWordCounts = pairs.reduceByKeyAndWindow( (a:Int,b:Int) => (a + b), Seconds(30), Seconds(10)) ``` <br/> 还有一种更通用的方式:`window(windowLength, slideInterval)`。下述代码效果与上面相同。 ```scala val ssc = new StreamingContext(sc, Seconds(1)) val lines = ssc.socketTextStream("localhost",6789) val words = lines.window(Seconds(30),Seconds(10)).flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print() ssc.start() ```