合规国际互联网加速 OSASE为企业客户提供高速稳定SD-WAN国际加速解决方案。 广告
使用Spark Streaming处理带状态的数据。 * 需求:计算到目前为止累计词频的个数 * 分析:DStream转换操作包括无状态转换和有状态转换 * 无状态转换:每个批次的处理不依赖于之前批次的数据 * 有状态转换:当前批次的处理需要使用之前批次的数据 * updateStateByKey属于有状态转换,可以跟踪状态的变化 * 实现要点 * 定义状态:状态数据可以是任意类型 * 定义状态更新函数:参数为数据流之前的状态和新的数据流数据 (1)编写Streaming程序 ```scala import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.{Seconds, StreamingContext} /** * @Date 2021/2/8 */ object StatefulWordCount { def main(args: Array[String]): Unit = { /** ********* 1. 创建StreamingContext ************/ val conf: SparkConf = new SparkConf().setAppName(this.getClass.getName).setMaster("local[4]") // Seconds(5)是批处理间隔,即将5秒内新收集的数据作为一个单位进行处理 val ssc = new StreamingContext(conf, Seconds(5)) /** ********* 2. 加载数据 ************/ // 设置checkpoint目录, 用来保存状态 ssc.checkpoint("file:///E:\\hadoop\\output") // socketTextStream(hostname, port, StorageLevel) val lines: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop101", 9999, StorageLevel.MEMORY_AND_DISK_SER_2) /** ************ 3. 进行统计 **************/ // 使用了 updateStateByKey 状态类的算子, 可以统计截止到当前位置的累加值, 需要传入一个更新状态的函数 val result: DStream[(String, Int)] = lines.flatMap(_.split("\\s+")) .map((_, 1)) //.updateStateByKey((x,y)=>Some(x.sum+y.getOrElse(0))) .updateStateByKey(updateFunction) result.print() // 打印输出 /** ************ 4. 启动Streaming程序 **************/ ssc.start() /** *********** 5. 等待应用程序终止 ****************/ ssc.awaitTermination() } /** * 定义一个更新状态的函数 * * @param currentValues 当前批次的value值的序列 * @param preValues 前一批次的统计状态值 * @return 更新状态值 */ def updateFunction(currentValues: Seq[Int], preValues: Option[Int]) = { val curr: Int = currentValues.sum val pre: Int = preValues.getOrElse(0) Some(curr + pre) } } ``` (2)启动`nc`并输入一些单词 ```shell [root@hadoop101 /]# nc -lk 9999 hello python hadoop hello python kafka ``` (3)运行上面的Streaming程序,控制台输出信息如下 ```txt ------------------------------------------- Time: 1612769255000 ms ------------------------------------------- (python,2) (hadoop,1) (hello,2) (kafka,1) ``` (4)再输入一些单词 ```shell [root@hadoop101 /]# nc -lk 9999 hello python hadoop hello python kafka python kafka ``` 可以看到python和kafka的单词数变多了 ```txt ------------------------------------------- Time: 1612769285000 ms ------------------------------------------- (python,3) (hadoop,1) (hello,2) (kafka,2) ```