企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
[TOC] # 1. 图常用算子 ![](https://img.kancloud.cn/fc/92/fc92b71f52c778519fbd1cbb3b2b0fbc_1009x396.png) ```scala import org.apache.spark.SparkContext import org.apache.spark.graphx.{Edge, Graph, VertexId} import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession object GraphxFun { def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession.builder() .master("local[4]") .appName(this.getClass.getName) .getOrCreate() val sc: SparkContext = spark.sparkContext import spark.implicits._ // 1. 构建点集合 val users: RDD[(Long, (String, Int))] = sc.parallelize(Array( (1L, ("Alice", 28)), (2L, ("Bob", 27)), (3L, ("Charlie", 65)), (4L, ("David", 42)), (5L, ("Ed", 55)), (6L, ("Fran", 50)) )) // 2. 构建边集合 val cntCall: RDD[Edge[Int]] = sc.parallelize(Array( Edge(2L, 1L, 7), Edge(2L, 4L, 2), Edge(3L, 2L, 4), Edge(3L, 6L, 3), Edge(4L, 1L, 1), Edge(5L, 2L, 2), Edge(5L, 3L, 8), Edge(5L, 6L, 3) )) // 3. 构图 val graph: Graph[(String, Int), Int] = Graph(users, cntCall) graph.triplets.foreach(x => println(x.toString())) // ((5,(Ed,55)),(3,(Charlie,65)),8) // ((2,(Bob,27)),(1,(Alice,28)),7) // ((4,(David,42)),(1,(Alice,28)),1) // ((3,(Charlie,65)),(2,(Bob,27)),4) // ((5,(Ed,55)),(2,(Bob,27)),2) // ((2,(Bob,27)),(4,(David,42)),2) // ((5,(Ed,55)),(6,(Fran,50)),3) // ((3,(Charlie,65)),(6,(Fran,50)),3) println() /** ********** mapXXX算子 ********************/ // 通过遍历该图的所有顶点,生成一个新的图 // 可以改变顶点的attr,并生成一个新的Graph返回,但顶点的Id改变不了 val graph2: Graph[(VertexId, String), Int] = graph.mapVertices((vertexId, attr) => (vertexId * 100, attr._1)) // 也可以采用如下写法,结果是一样的 // val graph2: Graph[(VertexId, String), Int] = graph.mapVertices({ case (vertexId, (name, age)) => (vertexId * 100, name) }) graph2.triplets.foreach(x => println(x.toString())) // ((2,(200,Bob)),(1,(100,Alice)),7) // ((2,(200,Bob)),(4,(400,David)),2) // ((5,(500,Ed)),(3,(300,Charlie)),8) // ((3,(300,Charlie)),(2,(200,Bob)),4) // ((4,(400,David)),(1,(100,Alice)),1) // ((5,(500,Ed)),(2,(200,Bob)),2) // ((5,(500,Ed)),(6,(600,Fran)),3) // ((3,(300,Charlie)),(6,(600,Fran)),3) println() // 遍历该图的所有边,生成一个新的图 // 只能改变边的attr,点什么都不能改变 val graph3: Graph[(String, Int), Int] = graph.mapEdges(e => e.attr * 100) graph3.triplets.foreach(x => println(x.toString())) // ((2,(Bob,27)),(1,(Alice,28)),700) // ((2,(Bob,27)),(4,(David,42)),200) // ((3,(Charlie,65)),(2,(Bob,27)),400) // ((4,(David,42)),(1,(Alice,28)),100) // ((5,(Ed,55)),(2,(Bob,27)),200) // ((3,(Charlie,65)),(6,(Fran,50)),300) // ((5,(Ed,55)),(3,(Charlie,65)),800) // ((5,(Ed,55)),(6,(Fran,50)),300) println() /** ************ 结构算子 ***************/ // 将该图所有边的方向反转,并生成新的Graph val graph4: Graph[(String, Int), Int] = graph.reverse graph4.triplets.foreach(println) // ((1,(Alice,28)),(2,(Bob,27)),7) // ((2,(Bob,27)),(3,(Charlie,65)),4) // ((3,(Charlie,65)),(5,(Ed,55)),8) // ((1,(Alice,28)),(4,(David,42)),1) // ((6,(Fran,50)),(5,(Ed,55)),3) // ((6,(Fran,50)),(3,(Charlie,65)),3) // ((4,(David,42)),(2,(Bob,27)),2) // ((2,(Bob,27)),(5,(Ed,55)),2) println() // 生成满足顶点条件的子图 val graph6: Graph[(String, Int), Int] = graph.subgraph(vpred = (id, attr) => attr._2 < 65) graph6.triplets.foreach(println) // ((5,(Ed,55)),(6,(Fran,50)),3) // ((2,(Bob,27)),(1,(Alice,28)),7) // ((2,(Bob,27)),(4,(David,42)),2) // ((4,(David,42)),(1,(Alice,28)),1) // ((5,(Ed,55)),(2,(Bob,27)),2) println() /** ************* join算子 ****************/ // 构建顶点集合 val vertices: RDD[(VertexId, String)] = sc.parallelize(Array((1L, "kgc.cn"), (2L, "baidu.com"), (3L, "google.com"))) // 内连接,根据顶点Id相等进行join,并生成连接后的图 // id: 图中顶点的Id与vertices顶点Id的交集 // attr: 图中顶点的attr // company: vertices顶点的attr val graph7: Graph[(String, Int), Int] = graph.joinVertices(vertices)((id, attr, company) => (id * 100 + "@" + company, attr._2)) graph7.triplets.foreach(println) // ((3,(300@google.com,65)),(2,(200@baidu.com,27)),4) // ((3,(300@google.com,65)),(6,(Fran,50)),3) // ((5,(Ed,55)),(3,(300@google.com,65)),8) // ((5,(Ed,55)),(6,(Fran,50)),3) // ((4,(David,42)),(1,(100@kgc.cn,28)),1) // ((5,(Ed,55)),(2,(200@baidu.com,27)),2) // ((2,(200@baidu.com,27)),(1,(100@kgc.cn,28)),7) // ((2,(200@baidu.com,27)),(4,(David,42)),2) println() // 外连接, // Id: 图顶点Id和vertices顶点Id的并集 // attr: 图顶点的attr // company: vertices顶点attr,图顶点Id不等于vertices中的顶点Id,则company默认为None val graph8: Graph[(String, Int), Int] = graph.outerJoinVertices(vertices)((id, attr, company) => (id * 100 + "#" + company, attr._2)) graph8.triplets.foreach(println) // ((4,(400#None,42)),(1,(100#Some(kgc.cn),28)),1) // ((5,(500#None,55)),(2,(200#Some(baidu.com),27)),2) // ((5,(500#None,55)),(3,(300#Some(google.com),65)),8) // ((5,(500#None,55)),(6,(600#None,50)),3) // ((2,(200#Some(baidu.com),27)),(1,(100#Some(kgc.cn),28)),7) // ((2,(200#Some(baidu.com),27)),(4,(400#None,42)),2) // ((3,(300#Some(google.com),65)),(2,(200#Some(baidu.com),27)),4) // ((3,(300#Some(google.com),65)),(6,(600#None,50)),3) } } ``` 下面举两个图算子应用的案例,帮助理解算子用途。 <br/> # 2. 案例一:计算用户的粉丝数量 ![](https://img.kancloud.cn/fc/92/fc92b71f52c778519fbd1cbb3b2b0fbc_1009x396.png) 顶点的入度就是这个用户的粉丝数量。 ```scala import org.apache.spark.SparkContext import org.apache.spark.graphx.{Edge, Graph} import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession object ComputeFanNum { def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession.builder() .master("local[4]") .appName(this.getClass.getName) .getOrCreate() val sc: SparkContext = spark.sparkContext import spark.implicits._ // 1. 构造顶点RDD val users: RDD[(Long, (String, Int))] = sc.parallelize(Array( (1L, ("Alice", 28)), (2L, ("Bob", 27)), (3L, ("Charlie", 65)), (4L, ("David", 55)), (5L, ("Ed", 55)), (6L, ("Fran", 50)) )) // 2. 构造边RDD val cntCall: RDD[Edge[Int]] = sc.parallelize(Array( Edge(2L, 1L, 7), Edge(2L, 4L, 2), Edge(3L, 2L, 4), Edge(3L, 6L, 3), Edge(4L, 1L, 1), Edge(5L, 2L, 2), Edge(5L, 3L, 8), Edge(5L, 6L, 3) )) // 3. 构图 val graph: Graph[(String, Int), Int] = Graph(users, cntCall) case class User(name: String, age: Int, inDeg: Int, outDeg: Int) // 将顶点attr转换为User val initUserGraph: Graph[User, Int] = graph.mapVertices({ case (id, (name, age)) => User(name, age, 0, 0) }) val userGraph: Graph[User, Int] = initUserGraph.outerJoinVertices(initUserGraph.inDegrees)({ case (id, attr, inDegOpt) => User(attr.name, attr.age, inDegOpt.getOrElse(0), 0) }).outerJoinVertices(initUserGraph.outDegrees)({ case (id, attr, outDegOpt) => User(attr.name, attr.age, attr.inDeg, outDegOpt.getOrElse(0)) }) userGraph.vertices.foreach(x => println(s"用户${x._1}是${x._2.name}他拥有${x._2.inDeg}个粉丝.")) // 用户4是David他拥有1个粉丝. // 用户1是Alice他拥有2个粉丝. // 用户6是Fran他拥有2个粉丝. // 用户3是Charlie他拥有1个粉丝. // 用户2是Bob他拥有2个粉丝. // 用户5是Ed他拥有0个粉丝. } } ``` <br/> # 3. 案例二:谁是网络红人 (1)案例数据格式 ```txt ((被跟随者), (跟随者)) ((User47,86566510),(User83,15647839)) ((User47,86566510),(User42,197134784)) ((User89,74286565),(User49,19315174)) ((User16,22679419),(User69,45705189)) ``` (2)案例要求 创建图并计算每个用户的粉丝数量,找出谁才是网络红色。 (3)案例代码 ```scala import org.apache.spark.SparkContext import org.apache.spark.graphx.{Edge, Graph} import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession import scala.util.matching.Regex object InternetCelebrityGraphx { def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession.builder() .master("local[4]") .appName(this.getClass.getName) .getOrCreate() val sc: SparkContext = spark.sparkContext import spark.implicits._ // 匹配((User47,86566510),(User83,15647839))的正则表达式 val pattern: Regex = """\(\((User\d+,\d+)\),\((User\d+,\d+)\)\)""".r // 加载数据文件,分割出(用户名, 用户Id) val twitters: RDD[(Array[String], Array[String])] = sc.textFile("file:///E:\\hadoop\\input\\twitter_graph_data.txt") .map(line => line match { case pattern(followee, follower) => (Some(followee), Some(follower)) case _ => (None, None) // 将None值过滤 }).filter(x => x._1 != None && x._2 != None) // 将(User47,86566510)分割出Array(用户名, Id) .map(x => (x._1.get.split(","), x._2.get.split(","))) twitters.toDF.show(3) // +------------------+-------------------+ // | _1| _2| // +------------------+-------------------+ // |[User47, 86566510]| [User83, 15647839]| // |[User47, 86566510]|[User42, 197134784]| // |[User89, 74286565]| [User49, 19315174]| // +------------------+-------------------+ // 构建顶点RDD val verts: RDD[(Long, String)] = twitters.flatMap(x => Array((x._1(1).toLong, x._1(0)), (x._2(1).toLong, x._2(0)))).distinct() verts.toDF.show(3) // +--------+------+ // | _1| _2| // +--------+------+ // |38521400|User85| // |14676022| User0| // |24741685|User87| // +--------+------+ // 构建边RDD val edges: RDD[Edge[String]] = twitters.map(x => Edge(x._2(1).toLong, x._1(1).toLong, "follow")) // 构建图有可能会出现一种情况, 在边集合中出现的点在点集合中不存在,所以提供一个默认值 "" val graph: Graph[String, String] = Graph(verts, edges, "") // 谁是网络红人,就看哪个顶点的入口多,按照降序排序 graph.inDegrees.repartition(1).sortBy(x => x._2, false).toDF.show(5) // +---------+---+ // | _1| _2| // +---------+---+ // | 36851222| 56| // |123004655| 56| // | 59804598| 54| // | 63644892| 46| // | 14444530| 42| // +---------+---+ } } ```