合规国际互联网加速 OSASE为企业客户提供高速稳定SD-WAN国际加速解决方案。 广告
* Spark SQL内置的函数都在 `org.apache.spark.sql.functions`中(functions是一个object,不是一个package). * 内置函数大致分类如下: ```scala 类别 函数举例 聚合函数 count(),countDistinct(),avg(),max(),min() 集合函数 sort_array、explode 日期、时间函数 hour、quarter、next_day 数学函数 asin、atan、sqrt、tan、round 开窗函数 row_number 字符串函数 concat、format_number、regexp_extract 其他函数 isNaN、sha、randn、callUDF ``` 下面是内置函数使用的一个例子: ```scala import org.apache.spark.SparkContext import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} object InnerFun { def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession.builder() .master("local[4]") .appName(this.getClass.getName) .getOrCreate() val sc: SparkContext = spark.sparkContext import spark.implicits._ // 示例数据 val accessLog = Array( "2016-12-27,001", "2016-12-27,001", "2016-12-27,002", "2016-12-28,003", "2016-12-28,004", "2016-12-28,002", "2016-12-28,002", "2016-12-28,001" ) // 创建DataFrame val accessLogRDD = sc.parallelize(accessLog).map(row => { val splited = row.split(",") Row(splited(0), splited(1).toInt) }) val structTypes = StructType(Array( StructField("day", StringType, true), StructField("userId", IntegerType, true) )) val accessLogDF = spark.createDataFrame(accessLogRDD, structTypes) accessLogDF.show() // +----------+------+ // | day|userId| // +----------+------+ // |2016-12-27| 1| // |2016-12-27| 1| // |2016-12-27| 2| // |2016-12-28| 3| // |2016-12-28| 4| // |2016-12-28| 2| // |2016-12-28| 2| // |2016-12-28| 1| // +----------+------+ // 导入Spark SQL内置的函数 import org.apache.spark.sql.functions._ //求每天所有的访问量(pv) accessLogDF.groupBy("day").agg(count("userId").as("pv")) .show() // +----------+---+ // | day| pv| // +----------+---+ // |2016-12-28| 5| // |2016-12-27| 3| // +----------+---+ //求每天的去重访问量(uv) accessLogDF.groupBy("day").agg(countDistinct("userId").as("uv")) .show() // +----------+---+ // | day| uv| // +----------+---+ // |2016-12-28| 4| // |2016-12-27| 2| // +----------+---+ } } ```