3.4 Spark RDD Action操作3-聚合-aggregate、fold、reduce

云计算 waitig 561℃ 百度已收录 0评论

1 aggregate
def aggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): U
aggregate用户聚合RDD中的元素,先使用seqOp将RDD中每个分区中的T类型元素聚合成U类型,再使用combOp将之前每个分区聚合后的U类型聚合成U类型,特别注意seqOp和combOp都会使用zeroValue的值,zeroValue的类型为U。
例子:
(1)

var rdd1 = sc.makeRDD(1 to 10,2)
rdd1.mapPartitionsWithIndex{
        (partIdx,iter) => {
          var part_map = scala.collection.mutable.Map[String,List[Int]]()
            while(iter.hasNext){
              var part_name = "part_" + partIdx;
              var elem = iter.next()
              if(part_map.contains(part_name)) {
                var elems = part_map(part_name)
                elems ::= elem
                part_map(part_name) = elems
              } else {
                part_map(part_name) = List[Int]{elem}
              }
            }
            part_map.iterator

        }
      }.collect
res16: Array[(String, List[Int])] = Array((part_0,List(5, 4, 3, 2, 1)), (part_1,List(10, 9, 8, 7, 6)))
##第一个分区中包含5,4,3,2,1
##第二个分区中包含10,9,8,7,6

scala> rdd1.aggregate(1)(
                {(x : Int,y : Int) => x + y}, 
                {(a : Int,b : Int) => a + b}
          )
res17: Int = 58
结果为什么是58,看下面的计算过程:
##先在每个分区中迭代执行 (x : Int,y : Int) => x + y 并且使用zeroValue的值1
##即:part_0中 zeroValue+5+4+3+2+1 = 1+5+4+3+2+1 = 16
## part_1中 zeroValue+10+9+8+7+6 = 1+10+9+8+7+6 = 41
##再将两个分区的结果合并(a : Int,b : Int) => a + b ,并且使用zeroValue的值1
##即:zeroValue+part_0+part_1 = 1 + 16 + 41 = 58

(2)

    scala> rdd1.aggregate(2)(
                  {(x : Int,y : Int) => x + y}, 
                    {(a : Int,b : Int) => a * b}
           )
    res18: Int = 1428

##这次zeroValue=2
##part_0中 zeroValue+5+4+3+2+1 = 2+5+4+3+2+1 = 17
##part_1中 zeroValue+10+9+8+7+6 = 2+10+9+8+7+6 = 42
##最后:zeroValue*part_0*part_1 = 2 * 17 * 42 = 1428
因此,zeroValue即确定了U的类型,也会对结果产生至关重要的影响,使用时候要特别注意。

(3)

aggregate函数将每个分区里面的元素进行聚合(seqOp),然后用combine函数将每个分区的结果和初始值(zeroValue)进行combine操作。这个函数最终返回的类型不需要和RDD中元素类型一致。
scala> def seqOP(a:Int, b:Int) : Int = {
         val r = a*b
         println("seqOp: " + a + "\t" + b+"=>"+r)
         r
        }
seqOP: (a: Int, b: Int)Int

scala>   def combOp(a:Int, b:Int): Int = {
          val r= a+b
         println("combOp: " + a + "\t" + b+"=>"+r)
        r
        }
combOp: (a: Int, b: Int)Int

scala> val z = sc. parallelize ( List (1 ,2 ,3 ,4 ,5 ,6) , 2)
z: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[9] at parallelize at <console>:27

scala> z. aggregate(3)(seqOP, combOp)
combOp: 3   18=>21
combOp: 21  360=>381
res20: Int = 381

计算流程:
1、对List(1,2,3,4,5,6)分区,分成(1,2,3)(4,5,62、对(1,2,3)执行seqOp方法:
3(初始值)*1=>3
3(上轮计算结果)*2=>6
6*3=>18
     对(4,5,6)执行seqOp方法
3(初始值)*4=>12
12(上轮计算结果)*5=>60
60*6=>360
3、对分区结果惊醒combine操作
3(初始值)+18(分区结果)=>21
21(上轮计算结果)+360(分区结果) =>381
注意:
1、reduce函数和combine函数必须满足交换律(commutative)和结合律(associative)
2、从aggregate 函数的定义可知,combine函数的输出类型必须和输入的类型一致

2 fold
def fold(zeroValue: T)(op: (T, T) ⇒ T): T
fold是aggregate的简化,将aggregate中的seqOp和combOp使用同一个函数op。
例子:

scala> rdd1.fold(1)(
            (x,y) => x + y    
          )
res19: Int = 58

##结果同上面使用aggregate的第一个例子一样,即:
scala> rdd1.aggregate(1)(
                {(x,y) => x + y}, 
                {(a,b) => a + b}
          )
res20: Int = 58

3 reduce
def reduce(f: (T, T) ⇒ T): T
根据映射函数f,对RDD中的元素进行二元计算,返回计算结果。
例子:

scala> var rdd1 = sc.makeRDD(1 to 10,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[36] at makeRDD at :21

scala> rdd1.reduce(_ + _)
res18: Int = 55

scala> var rdd2 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[38] at makeRDD at :21

scala> rdd2.reduce((x,y) => {
            (x._1 + y._1,x._2 + y._2)
          })
res21: (String, Int) = (CBBAA,6)

本文由【waitig】发表在等英博客
本文固定链接:3.4 Spark RDD Action操作3-聚合-aggregate、fold、reduce
欢迎关注本站官方公众号,每日都有干货分享!
等英博客官方公众号
点赞 (0)分享 (0)