企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
使用MySQL数据库作为数据源。<br/> (1)在指定MySQL驱动包 ```shell -- 将MySQL的驱动包放到 {SPARK_HOME}/conf/mysql-connector-java-5.1.27.jar -- 或者使用spak-shell命令指定驱动包 [root@hadoop101 spark]# spark-shell --jars /opt/software/mysql-connector-java-5.1.38.jar ``` (2)scala代码 记得启动MySQL服务。 ```scala import java.util.Properties import org.apache.spark.sql.{DataFrame, SparkSession} object MysqlSource { def main(args: Array[String]): Unit = { val spark:SparkSession = SparkSession.builder() .master("local[4]") .appName(this.getClass.getName) .getOrCreate() // 1. MySQL相关配置 val url = "jdbc:mysql://hadoop101:3306/sqoop_db" val tableName = "orders" val props = new Properties() props.setProperty("user", "root") props.setProperty("password", "123456") props.setProperty("driver", "com.mysql.jdbc.Driver") // 2. 读取MySQL中的数据创建DataFrame val df:DataFrame = spark.read.jdbc(url, tableName, props) df.show(5) // +--------+-------------------+-----------------+---------------+ // |order_id| order_date|order_customer_id| order_status| // +--------+-------------------+-----------------+---------------+ // | 1|2013-07-25 00:00:00| 11599| CLOSED| // | 2|2013-07-25 00:00:00| 256|PENDING_PAYMENT| // | 3|2013-07-25 00:00:00| 12111| COMPLETE| // | 4|2013-07-25 00:00:00| 8827| CLOSED| // | 5|2013-07-25 00:00:00| 11318| COMPLETE| // +--------+-------------------+-----------------+---------------+ // 3. 也可以按照mode将写数据到MySQL df.write.mode("overwrite").jdbc(url, tableName, props) } } ```