This website requires JavaScript.

Spark函数讲解:aggregate

本文案例来自http://apachesparkbook.blogspot.hk,加入了自己解说,希望能帮助各位理解aggregate函数的运作方式。

语法

def aggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): U Aggregate the elements of each partition, and then the results for all the partitions, using given combine functions and a neutral "zero value". This function can return a different result type, U, than the type of this RDD, T. Thus, we need one operation for merging a T into an U and one operation for merging two U's, as in scala.TraversableOnce. Both of these functions are allowed to modify and return their first argument instead of creating a new U to avoid memory allocation.

按照我的理解就是seqOp根据初始值zeroValue对每个分区进行计算,然后通过combOp进行合并计算(也会先算一次zeroValue),有点像reduce但是返回不同类型.具体可以查看以下实例.

实例

在本例RDD元素的类型为(String, Int),而返回类型为Int

scala> val inputrdd = sc.parallelize(
    |    List(
    |       ("maths", 21),
    |       ("english", 22),
    |       ("science", 31)
    |    ),
    |    3
    | )
inputrdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[14] at parallelize at :21

scala> inputrdd.partitions.size
res18: Int = 3

scala> val result = inputrdd.aggregate(3) (
    |    /*
    |     * 这里的seqOp将T转化为U
    |     * 例如(String, Int)转为Int
    |     * (拿到(String, Int)的'值'然后返回Int)
    |     * 参数 :
    |     * acc   :  表示累加的结果
    |     * value :  被输入的RDD元素的值
    |     *          在我们的例子中,类型为(String, Int)
    |     * 返回值
    |     * 我们返回Int
    |     */
    |    (acc, value) => (acc + value._2),
    |
    |    /*
    |     * 这里用来合并seqOp每个分区的计算结果
    |     * (如两个Int)
    |     */
    |    (acc1, acc2) => (acc1 + acc2)
    | )
result: Int = 86

我们来过一遍计算过程:

Partition 1 : zeroValue + value._2 = 3 + 21 = 24 Partition 2 : zeroValue + value._2 = 3 + 22 = 25 Partition 3 : zeroValue + value._2 = 3 + 31 = 34 Result = zeroValue + Partition 1 + Partition 2 + Partition 3 = 3 + 24 + 25 + 34 = 86

如果是一个分区:

Partition 1 : zeroValue + value._2 = 3 + 21 = 24 + 22 = 46 + 31 = 77 Result = zeroValue + Partition 1 = 3 + 77 = 80

还有一些例子可以看这里

0条评论
avatar