企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
在默认情况下,当 Spark 在集群的多个不同节点的多个任务上并行运行一个函数时,它会把函数中涉及到的每个变量,在每个任务上都生成一个副本。但是,有时候需要在多个任务之间共享变量,或者在任务 (Task) 和任务控制节点 (Driver Program) 之间共享变量。 为了满足这种需求,Spark 提供了两种类型的变量: * **累加器 (accumulators)**:累加器支持在所有不同节点之间进行累加计算 (比如计数或者求和)。 * **广播变量 (broadcast variables)**:广播变量用来把变量在所有节点的内存之间进行共享,在每个机器上缓存一个只读的变量,而不是为机器上的每个任务都生成一个副本。 ## 广播变量 广播变量允许程序员缓存一个只读的变量在每台机器上面,而不是每个任务保存一份拷贝。例如,利用广播变量,我们能够以一种更有效率的方式将一个大数据量输入集合的副本分配给每个节点。(Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks.They can be used, for example, to give every node a copy of a large input dataset in an efficient manner.)Spark也尝试着利用有效的广播算法去分配广播变量,以减少通信的成本。 一个广播变量可以通过调用`SparkContext.broadcast(v)`方法从一个初始变量v中创建。广播变量是v的一个包装变量,它的值可以通过`value`方法访问,下面的代码说明了这个过程: ```scala scala> val broadcastVar = sc.broadcast(Array(1, 2, 3)) broadcastVar: spark.Broadcast[Array[Int]] = spark.Broadcast(b5c40191-a864-4c7d-b9bf-d87e1a4e787c) scala> broadcastVar.value res0: Array[Int] = Array(1, 2, 3) ``` 广播变量创建以后,我们就能够在集群的任何函数中使用它来代替变量v,这样我们就不需要再次传递变量v到每个节点上。另外,为了保证所有的节点得到广播变量具有相同的值,对象v不能在广播之后被修改。 **一个相对完整的例子** ``` import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object BroadcastVariablesTest { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("wc").setMaster("local[*]") val sc: SparkContext = new SparkContext(conf) sc.setLogLevel("WARN") //不使用广播变量 val kvFruit: RDD[(Int, String)] = sc.parallelize(List((1,"apple"),(2,"orange"),(3,"banana"),(4,"grape"))) val fruitMap: collection.Map[Int, String] =kvFruit.collectAsMap //scala.collection.Map[Int,String] = Map(2 -> orange, 4 -> grape, 1 -> apple, 3 -> banana) val fruitIds: RDD[Int] = sc.parallelize(List(2,4,1,3)) //根据水果编号取水果名称 val fruitNames: RDD[String] = fruitIds.map(x=>fruitMap(x)) fruitNames.foreach(println) //注意:以上代码看似一点问题没有,但是考虑到数据量如果较大,且Task数较多, //那么会导致,被各个Task共用到的fruitMap会被多次传输 //应该要减少fruitMap的传输,一台机器上一个,被该台机器中的Task共用即可 //如何做到?---使用广播变量 //注意:广播变量的值不能被修改,如需修改可以将数据存到外部数据源,如MySQL、Redis println("=====================") val BroadcastFruitMap: Broadcast[collection.Map[Int, String]] = sc.broadcast(fruitMap) val fruitNames2: RDD[String] = fruitIds.map(x=>BroadcastFruitMap.value(x)) fruitNames2.foreach(println) } } ``` ## 累加器 顾名思义,累加器是一种只能通过关联操作进行“加”操作的变量,因此它能够高效的应用于并行操作中。它们能够用来实现`counters`和`sums`。Spark原生支持数值类型的累加器,开发者可以自己添加支持的类型。 如果创建了一个具名的累加器,它可以在spark的UI中显示。这对于理解运行阶段(running stages)的过程有很重要的作用。(注意:这在python中还不被支持) 一个累加器可以通过调用`SparkContext.accumulator(v)`方法从一个初始变量v中创建。运行在集群上的任务可以通过`add`方法或者使用`+=`操作来给它加值。然而,它们无法读取这个值。只有驱动程序可以使用`value`方法来读取累加器的值。 如下的代码,展示了如何利用累加器将一个数组里面的所有元素相加: ```scala scala> val accum = sc.accumulator(0, "My Accumulator") accum: spark.Accumulator[Int] = 0 scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x) ... 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s scala> accum.value res2: Int = 10 ``` 这个例子利用了内置的整数类型累加器。开发者可以利用子类[AccumulatorParam](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.AccumulatorParam)创建自己的 累加器类型。AccumulatorParam接口有两个方法:`zero`方法为你的数据类型提供一个“0 值”(zero value);`addInPlace`方法计算两个值的和。例如,假设我们有一个`Vector`类代表数学上的向量,我们能够 如下定义累加器: ```scala object VectorAccumulatorParam extends AccumulatorParam[Vector] { def zero(initialValue: Vector): Vector = { Vector.zeros(initialValue.size) } def addInPlace(v1: Vector, v2: Vector): Vector = { v1 += v2 } } // Then, create an Accumulator of this type: val vecAccum = sc.accumulator(new Vector(...))(VectorAccumulatorParam) ``` 在scala中,Spark支持用更一般的[Accumulable](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.Accumulable)接口来累积数据-结果类型和用于累加的元素类型 不一样(例如通过收集的元素建立一个列表)。Spark也支持用`SparkContext.accumulableCollection`方法累加一般的scala集合类型。 **一个相对完整的测试的例子** 通常在向 `Spark` 传递函数时,比如使用 `map()` 函数或者用`filter()`传条件时,可以使用驱动器程序中定义的变量,但是集群中运行的每个任务都会得到这些变量的一份新的副本,更新这些副本的值也不会影响驱动器中的对应变量。这时使用累加器就可以实现我们想要的效果: 语法:`val xx: Accumulator[Int] = sc.accumulator(0)` 示例代码: ``` import org.apache.spark.rdd.RDD import org.apache.spark.{Accumulator, SparkConf, SparkContext} object AccumulatorTest { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("wc").setMaster("local[*]") val sc: SparkContext = new SparkContext(conf) sc.setLogLevel("WARN") //使用scala集合完成累加 var counter1: Int = 0; var data = Seq(1,2,3) data.foreach(x => counter1 += x ) println(counter1)//6 println("+++++++++++++++++++++++++") //使用RDD进行累加(RDD是分布式的) var counter2: Int = 0; val dataRDD: RDD[Int] = sc.parallelize(data) //分布式集合的[1,2,3] dataRDD.foreach(x => counter2 += x) println(counter2)//0 //注意:上面的RDD操作运行结果是0 //因为foreach中的函数是传递给Worker中的Executor执行,用到了counter2变量 //而counter2变量在Driver端定义的,在传递给Executor的时候,各个Executor都有了一份counter2 //最后各个Executor将各自个x加到自己的counter2上面了,和Driver端的counter2没有关系 //那这个问题得解决啊!不能因为使用了Spark连累加都做不了了啊! //如果解决?---使用累加器 val counter3: Accumulator[Int] = sc.accumulator(0) dataRDD.foreach(x => counter3 += x) println(counter3)//6 } } ```