🔥码云GVP开源项目 12k star Uniapp+ElementUI 功能强大 支持多语言、二开方便! 广告
Spark Streaming 通过 Push 和 Pull 两种方式对接 Flume 数据源。以 Spark Streaming 的角度来看,Push 方式属于推送(由 Flume 向 Spark 推送)而 Pull 属于拉取(Spark 拉取 Flume 的输出)。<br/> 不论以何种方式,开发过程类似,都是由 Spark Streaming 对接 Flume 数据流,Flume 做为 Spark Streaming 的数据源。Push 和 Pull 两者的差别主要体现在Flume Sink 的不同,而 Flume Source 与 Channel 不会受影响。在演示示例时,Flume Source 以 nectcat 为例,Channel 为 memory,重点关注 Sink 的变化。在下文中也是如此。 [TOC] # 1. Push方式 1. 编写Flume的配置文件 ```conf # 定义 source, channel, 和sink的名字 a1.sources = s1 a1.channels = c1 a1.sinks = avroSink # 对source的一些设置 a1.sources.s1.type = netcat a1.sources.s1.bind = localhost a1.sources.s1.port = 5678 a1.sources.s1.channels = c1 # 对channel的一些设置 a1.channels.c1.type = memory # 对sink的一些设置 a1.sinks.avroSink.type = avro a1.sinks.avroSink.channel = c1 a1.sinks.avroSink.hostname = hadoop101 a1.sinks.avroSink.port = 9999 ``` 2. 编写Spark Streaming程序 ```scala package streaming import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.flume.{FlumeUtils, SparkFlumeEvent} import org.apache.spark.streaming.{Seconds, StreamingContext} /** * @Date 2021/2/8 */ object FlumePushWordCount { def main(args: Array[String]): Unit = { /** ********* 1. 创建StreamingContext ************/ val conf: SparkConf = new SparkConf().setAppName(this.getClass.getName).setMaster("local[4]") // Seconds(5)是批处理间隔,即将5秒内新收集的数据作为一个单位进行处理 val ssc = new StreamingContext(conf,Seconds(5)) /** ********* 2. 加载数据 ************/ // FlumeUtils.createStream(StreamingContext, hostname, port) val flumeStream: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createStream(ssc,"hadoop101",9999) val lines: DStream[String] = flumeStream.map(x=>new String(x.event.getBody.array()).trim) /** ************ 3. 进行统计 **************/ val result: DStream[(String, Int)] = lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_) result.print() // 打印输出 /** ********* 4. 启动Streaming程序 ************/ ssc.start() /** *********** 5. 等待应用程序终止 ****************/ ssc.awaitTermination() } } ``` 3. 将上面的Spark Streaming打成jar包,上传到集群运行 ```shell [root@hadoop101 spark]# bin/spark-submit --class streaming.FlumePushWordCount /opt/software/streaming-1.0-SNAPSHOT-jar-with-dependencies.jar ``` 4. 启动Flume ```shell [root@hadoop101 flume]# bin/flume-ng agent --name a1 -f myconf/sink_spark-push.conf -Dflume.root.logger=INFO,console ``` 5. 启动`telnet`并输入数据 ```shell [root@hadoop101 /]# telnet localhost 5678 Trying ::1... telnet: connect to address ::1: Connection refused Trying 127.0.0.1... Connected to localhost. Escape character is '^]'. hello world spark hadoop hello OK ``` 查看Spark Streaming程序的输出结果如下 ```txt ------------------------------------------- Time: 1611154690000 ms ------------------------------------------- (spark,1) (hadoop,1) (hello,2) (world,1) ``` <br/> # 2. Pull方式 1. 编写Flume的配置文件 ```conf # 定义 source, channel, 和sink的名字 a1.sources = s1 a1.channels = c1 a1.sinks = spark # 对source的一些设置 a1.sources.s1.type = netcat a1.sources.s1.bind = localhost a1.sources.s1.port = 5678 a1.sources.s1.channels = c1 # 对channel的一些设置 a1.channels.c1.type = memory # 对sink的一些设置 # 需要将spark-streaming-flume_2.11-2.4.4.jar包拷贝到$FLUME_HOME/lib a1.sinks.spark.type = org.apache.spark.streaming.flume.sink.SparkSink a1.sinks.spark.hostname = hadoop101 a1.sinks.spark.port = 9999 a1.sinks.spark.channel = c1 ``` 2. 上传Flume sink必要的包到`$FLUME_HOME/lib`目录【一共6个】 ```txt avro-1.8.2.jar avro-ipc-1.8.2.jar commons-lang3-3.5.jar scala-library-2.11.8.jar spark-streaming-flume_2.11-2.4.4.jar spark-streaming-flume-sink_2.11-2.4.4.jar ``` 我安装的flume默认已经有下面版本的jar包,为了防止冲突所以需要将其删除 ```shell [root@hadoop101 lib]# rm -rf avro-1.7.4.jar [root@hadoop101 lib]# rm -rf avro-ipc-1.7.4.jar [root@hadoop101 lib]# rm -rf commons-lang-2.5.jar [root@hadoop101 lib]# rm -rf scala-library-2.10.5.jar ``` ```shell [root@hadoop101 flume-sink]# ls | grep spark spark-streaming-flume_2.11-2.4.4.jar spark-streaming-flume-sink_2.11-2.4.4.jar ``` 3. 编写Spark Streaming程序 ```scala package streaming import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.flume.{FlumeUtils, SparkFlumeEvent} import org.apache.spark.streaming.{Seconds, StreamingContext} /** * @Date 2021/2/8 */ object FlumePullWordCount { def main(args: Array[String]): Unit = { /** ********* 1. 创建StreamingContext ************/ val conf: SparkConf = new SparkConf().setAppName(this.getClass.getName).setMaster("local[4]") // Seconds(5)是批处理间隔,即将5秒内新收集的数据作为一个单位进行处理 val ssc = new StreamingContext(conf, Seconds(5)) /** ********* 2. 加载数据 ************/ // FlumeUtils.createPollingStream(StreamingContext, hostname, port) val flumeStream: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createPollingStream(ssc, "hadoop101", 9999) val lines: DStream[String] = flumeStream.map(x => new String(x.event.getBody.array()).trim) /** ************ 3. 进行统计 **************/ val result: DStream[(String, Int)] = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _) result.print() // 打印输出 /** ********* 4. 启动Streaming程序 ************/ ssc.start() /** *********** 5. 等待应用程序终止 ****************/ ssc.awaitTermination() } } ``` 4. 启动Flume ```shell [root@hadoop101 flume]# bin/flume-ng agent --name a1 -f myconf/sink_spark-pull.conf -Dflume.root.logger=INFO,console ``` 5. 将上面的Spark Streaming打成jar包,上传到集群运行 ```shell [root@hadoop101 spark]# bin/spark-submit --class streaming.FlumePullWordCount /opt/software/streaming-1.0-SNAPSHOT-jar-with-dependencies.jar ``` 6. 启动`telnet`并输入数据 ```shell [root@hadoop101 /]# telnet localhost 5678 Trying ::1... telnet: connect to address ::1: Connection refused Trying 127.0.0.1... Connected to localhost. Escape character is '^]'. hello world spark hadoop hello OK ``` 查看Spark Streaming程序的输出结果如下 ```txt ------------------------------------------- Time: 1611154690000 ms ------------------------------------------- (spark,1) (hadoop,1) (hello,2) (world,1) ```