合规国际互联网加速 OSASE为企业客户提供高速稳定SD-WAN国际加速解决方案。 广告
Spark Streaming支持如下的数据源: * 基础数据源:文件系统、Socket连接、RDD队列等 * 高级数据源:Kafka、Flume、Kinesis等 * 自定义数据源 ```scala // 文件系统 def textFileStream(directory: String): DStream[String] // 注意textFileStream()参数必须是文件目录, 但可以支持通配符如"hdfs://namenode:8020/logs/2017/*"。 // Spark 将监视该目录任务新建的文件,一旦有新文件才会处理。所有文件要 // 求有相同的数据格式,并且监视文件的修改时间而不是创建时间,注意更新文件 // 内容不会被监视,一旦开始处理,这些文件必须不能再更改,因此如果文件被连 // 续地追加,新的数据也不会被读取。文件流不需要运行接收器,因此,不需要分配内核。 // Socket,在【StreamingContext API】给出的示例用的就是Socket源 def socketTextStream(hostname: String, port: Int, storageLevel: StorageLevel): ReceiverInputDStream[String] // RDD队列 //创建一个能够 push 到 QueueInputDStream 的 RDDs 队列 val rddQueue = new mutable.SynchronizedQueue[RDD[Int]]() //基于一个 RDD 队列创建一个输入源 val inputStream = ssc.queueStream(rddQueue) // Flume Sink val ds = FlumeUtils.createPollingStream(streamCtx, [sink hostname], [sink port]); // Kafka Consumer val ds = KafkaUtils.createStream(streamCtx, zooKeeper, consumerGrp, topicMap); ```