合规国际互联网加速 OSASE为企业客户提供高速稳定SD-WAN国际加速解决方案。 广告
* Parquet文件:是一种流行的列式存储格式,以二进制存储,文件中包含数据与元数。 * Parquet是Spark默认的存储格式。 ```scala import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.types.{ArrayType, IntegerType, StringType, StructField, StructType} object ParquetSource { def main(args: Array[String]): Unit = { val spark:SparkSession = SparkSession.builder() .master("local[4]") .appName(this.getClass.getName) .getOrCreate() val sc:SparkContext = spark.sparkContext import spark.implicits._ /***** Spark SQL 写 parquet文件 *****/ // 1. 定义数据结构(Schema) val schema = StructType(Array( StructField("name", StringType), StructField("favorite_color", StringType), StructField("favorite_numbers", ArrayType(IntegerType)) )) // 2. 创建DataFrame val rdd = sc.parallelize(List(("Alyssa", null, Array(3, 9, 15, 20)), ("Ben", "red", null))) val rowRDD:RDD[Row] = rdd.map(p=>Row(p._1, p._2, p._3)) val df1 = spark.createDataFrame(rowRDD, schema) // 3. 将数据写入.parquet文件中 // 文件已经存在则报错 // 在E:\hadoop\output\目录下会创建多个类似于 // part-00003-3c959916-6a4a-410a-bca9-e1bd56953107.c000.snappy.parquet的文件 df1.write.partitionBy("name").parquet("file:///E:\\hadoop\\output") // 也可以根据某一个字段指定写入到哪个分区 // df1.write.partitionBy("name").parquet("file:///E:\\hadoop\\output") /***** Spark SQL 读 parquet文件 *****/ val df2 = spark.read.parquet("file:///E:\\hadoop\\output") df2.show() // +--------------+----------------+------+ // |favorite_color|favorite_numbers| name| // +--------------+----------------+------+ // | null| [3, 9, 15, 20]|Alyssa| // | red| null| Ben| // +--------------+----------------+------+ df2.printSchema() // root // |-- favorite_color: string (nullable = true) // |-- favorite_numbers: array (nullable = true) // | |-- element: integer (containsNull = true) // |-- name: string (nullable = true) } } ```