## updateStateByKey 除了能够支持 RDD 的算子外,DStream 还有部分独有的*transformation*算子,这当中比较常用的是 `updateStateByKey`。文章开头的词频统计程序,只能统计每一次输入文本中单词出现的数量,想要统计所有历史输入中单词出现的数量,可以使用 `updateStateByKey` 算子。代码如下: ~~~scala object NetworkWordCountV2 { def main(args: Array[String]) { /* * 本地测试时最好指定 hadoop 用户名,否则会默认使用本地电脑的用户名, * 此时在 HDFS 上创建目录时可能会抛出权限不足的异常 */ System.setProperty("HADOOP_USER_NAME", "root") val sparkConf = new SparkConf().setAppName("NetworkWordCountV2").setMaster("local[2]") val ssc = new StreamingContext(sparkConf, Seconds(5)) /*必须要设置检查点*/ ssc.checkpoint("hdfs://hadoop001:8020/spark-streaming") val lines = ssc.socketTextStream("hadoop001", 9999) lines.flatMap(_.split(" ")).map(x => (x, 1)) .updateStateByKey[Int](updateFunction _) //updateStateByKey 算子 .print() ssc.start() ssc.awaitTermination() } /** * 累计求和 * * @param currentValues 当前的数据 * @param preValues 之前的数据 * @return 相加后的数据 */ def updateFunction(currentValues: Seq[Int], preValues: Option[Int]): Option[Int] = { val current = currentValues.sum val pre = preValues.getOrElse(0) Some(current + pre) } } ~~~ 使用 `updateStateByKey` 算子,你必须使用 `ssc.checkpoint()` 设置检查点,这样当使用 `updateStateByKey` 算子时,它会去检查点中取出上一次保存的信息,并使用自定义的 `updateFunction` 函数将上一次的数据和本次数据进行相加,然后返回。 ## 处理文件系统的数据 ~~~ object FileWordCount{ def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[3]").setAppName("FileWordCount") val ssc = new StreamingContext(sparkConf,Seconds(5)); var lines = ssc.textFileStream("/Users/bizzbee/Desktop/work/projects/sparktrain/ss") val result = lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_); result.print() ssc.start() ssc.awaitTermination() } } ~~~ * 这里是**监控ss目录下新增的文件**。要从别的地方移过来,不能直接在里面写。 * 结果打印: ![]( * 文件必须相同的格式。 ## foreachRDD 将每次的RDD内容放进数据库 ~~~ object ForeachRDDApp{ def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("ForeachRDDApp").setMaster("local[2]") val ssc = new StreamingContext(sparkConf, Seconds(5)) val lines = ssc.socketTextStream("localhost", 6789) val result = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _) result.print() result.foreachRDD(rdd => { rdd.foreachPartition(partitionOfRecords => { val connection = createConnection() partitionOfRecords.foreach(record => { val sql = "insert into wordcount(word, wordcount) values('" + record._1 + "'," + record._2 + ")" connection.createStatement().execute(sql) }) connection.close() }) }) ssc.start() ssc.awaitTermination() } /** * 获取MySQL的连接 */ def createConnection() = { Class.forName("com.mysql.jdbc.Driver") DriverManager.getConnection("jdbc:mysql://localhost:3306/stark", "root", "934158") } } ~~~ ![]( * 存在问题: ![]( ## 窗口的DStream ![]( * *窗口长度* - The duration of the window (3 in the figure). * *窗口间隔* - The interval at which the window operation is performed (2 in the figure). ~~~scala val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10)) ~~~ ##黑名单处理 ![]( ![]( ~~~ object TranformApp{ def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") /** * 创建StreamingContext需要两个参数:SparkConf和batch interval */ val ssc = new StreamingContext(sparkConf, Seconds(5)) /** * 构建黑名单 */ val blacks = List("wade", "james") val blacksRDD = ssc.sparkContext.parallelize(blacks).map(x => (x, true)) val lines = ssc.socketTextStream("localhost", 6789) val clicklog = => (x.split(",")(1), x)).transform(rdd => { rdd.leftOuterJoin(blacksRDD) .filter(x=> x._2._2.getOrElse(false) != true) .map(x=>x._2._1) }) clicklog.print() ssc.start() ssc.awaitTermination() } } ~~~ ![](