企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
实现:接收TCPSocket的单词,并统计单词数量。 (1)编写Streaming程序 ```scala import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.{SparkConf} import org.apache.spark.streaming.{Seconds, StreamingContext} object NetworkWordCount { def main(args: Array[String]): Unit = { /** ************ 1. 创建StreamingContext **************/ // local[n]中的n要大于接收器的个数. 即要n大于JVM线程的个数,每个线程都会监听下面的9999端口. val conf: SparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getName) val ssc: StreamingContext = new StreamingContext(conf, Seconds(5)) /** ************ 2. 加载数据源 **************/ // socketTextStream(hostname, port, StorageLevel) val lines: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop101", 9999, StorageLevel.MEMORY_AND_DISK_SER) /** ************ 3. 进行统计 **************/ val words: DStream[String] = lines.flatMap(_.split(" ")) val wordCounts: DStream[(String, Int)] = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print() // 打印输出 /** ************ 4. 启动Streaming程序 **************/ ssc.start() /** *********** 5. 等待应用程序终止 ****************/ // 或者调用ssc.stop()直接停止当前的Streaming程序 ssc.awaitTermination() } } ``` (2)先启动`nc`,再启动上面的Streaming程序。 在Linux终端启动`nc`,并输入一些单词。 ```shell [root@hadoop101 /]# nc -lk 9999 hello spark hello scala kafka kafka ``` 如果在Linux中没有安装`nc`,运行如下命令安装: ```shell yum -y install nc ``` (3)启动上面的Streaming程序,控制台输出如下: ```txt ------------------------------------------- Time: 1611129350000 ms ------------------------------------------- (spark,1) (scala,1) (hello,2) (kafka,2) ``` (4)再次在`nc`中输入单词,Streaming程序会监控并收到数据进行统计 ```shell [root@hadoop101 /]# nc -lk 9999 hello spark hello scala kafka kafka sparkstreaming sparkstreaming wordcount ``` Streaming程序打印出如下信息: ```txt ------------------------------------------- Time: 1611129495000 ms ------------------------------------------- (wordcount,1) (sparkstreaming,2) ```