合规国际互联网加速 OSASE为企业客户提供高速稳定SD-WAN国际加速解决方案。 广告
自定义数据源即自定义 Receiver。自定义接收器必须通过实现两个方法来扩展 Receiver 抽象类。 * onStart():开始接收数据要做的事情。 * onStop():停止接收数据的操作。 onStart()和 onStop()不能无限阻塞。通常,onStart()将启动负责接收数据的线程,而 onStop()将确保这些接收数据的线程被停止。接收线程也可以使用 isStopped() 方法来检查它们是否应该停止接收数据。 <br/> 一旦接收到数据,就可以通过调用 store(data)将数据存储在 Spark 中,这是Receiver 类提供的方法。store()有多种形式,可以一次存储接收到的数据记录,也可以作为对象/序列化字节的整个集合。注意,用于实现接收器的 store() 的风格会影响其可靠性和容错语义。 <br/> 接收线程中的任何异常都应该被捕获并正确处理,以避免接收方的无声故障。restart() 将通过异步调用 onStop()和延迟后调用 onStart()来重新启动接收器。stop()将调用 onStop()并终止接收方。此外reportError() 在不停止/重新启动接收器的情况下向驱动程序报告错误消息(在日志和 UI 中可见)。 <br/> 下面是一个自定义 Socket 接收器示例。 ```scala import java.io.{BufferedReader, InputStreamReader} import java.net.Socket import java.nio.charset.StandardCharsets import org.apache.spark.SparkConf import org.apache.spark.internal.Logging import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.receiver.Receiver class CustomReceiver(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging { def onStart() { //启动接收线程 new Thread("Socket Receiver") { override def run() { receive() } }.start() } def onStop() { // There is nothing much to do as the thread calling receive() // is designed to stop by itself if isStopped() returns false } /** 创建 Socket 连接并接收数据直到 receiver 停止 */ private def receive() { var socket: Socket = null var userInput: String = null try { // 连接到 host:port socket = new Socket(host, port) //读 Socket val reader = new BufferedReader( new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8)) userInput = reader.readLine() while (!isStopped && userInput != null) { store(userInput) //存储接收到的数据,抽象类已实现 userInput = reader.readLine() } reader.close() socket.close() // 重新启动,当服务器再次激活时尝试重新连接 restart("Trying to connect again") } catch { case e: java.net.ConnectException => // 如果无法连接到服务器,重新启动 restart("Error connecting to " + host + ":" + port, e) case t: Throwable => // 如果有任何其他错误,重新启动 restart("Error receiving data", t) } } } object SparkDemo1 { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName(this.getClass.getName).setMaster("local[*]"); // Seconds(5)是批处理间隔,即将5秒内新收集的数据作为一个单位进行处理 val ssc: StreamingContext = new StreamingContext(conf, Seconds(5)) val customReceiverStream = ssc.receiverStream(new CustomReceiver(host, port)) val words = customReceiverStream.flatMap(_.split(" ")) } } ```