天天看点

Learning Spark笔记7-聚合

聚合

当数据集中的数据是key/value时,通常是使用相同的键汇总所有元素的统计信息。

reduceByKey()类似于reduce();他们都是使用函数来合并值。reduceByKey()运行几个并行的reduce操作,每个操作合并相同的键的值。因为数据集有大量的key,reduceByKey()没有实现一个动作返回一个值。相反的,它返回一个新的RDD,包含键和reduce后的值。

foldByKey()与fold()类似;

我们可以使用reduceByKey()和mapValues()来计算每个关键字的平均值,方式与如何使用fold()和map()来计算整个RDD平均值相似。

Example 4-7. Per-key average with reduceByKey() and mapValues() in Python

rdd.mapValues(lambda x: (x, 1)).reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))

Example 4-8. Per-key average with reduceByKey() and mapValues() in Scala

rdd.mapValues(x => (x, 1)).reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))

我们可以在示例4-9到4-11中使用类似的方法来实现经典的分布式单词计数问题。 我们将使用上一章中的flatMap(),以便我们可以生成一对RDD单词和数字1,然后使用reduceByKey()将所有单词合并在一起,如实例4-7和4-8所示。

Example 4-9. Word count in Python

rdd = sc.textFile("s3://...")

words = rdd.flatMap(lambda x: x.split(" "))

result = words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)

Example 4-10. Word count in Scala

val input = sc.textFile("s3://...")

val words = input.flatMap(x => x.split(" "))

val result = words.map(x => (x, 1)).reduceByKey((x, y) => x + y)

Example 4-11. Word count in Java

JavaRDD<String> input = sc.textFile("s3://...")

JavaRDD<String> words = rdd.flatMap(new FlatMapFunction<String, String>() {

 public Iterable<String> call(String x) { return Arrays.asList(x.split(" ")); }

});

JavaPairRDD<String, Integer> result = words.mapToPair(

 new PairFunction<String, String, Integer>() {

 public Tuple2<String, Integer> call(String x) { return new Tuple2(x, 1); }

}).reduceByKey(

 new Function2<Integer, Integer, Integer>() {

 public Integer call(Integer a, Integer b) { return a + b; }

});

实际上我们可以直接在第一个RDD上使用input.flatMap(x =>x.split(" ")).countByValue()来计算单词计数。

combineByKey()是最常用的每键聚合函数。大多数的每键combiner都使用它来实现。像是aggregate(),combineByKey()允许用户返回与输入数据不同的值。

通过key来组合数据我们的选择有很多。大多数都是在combineByKey()之上实现的。在任何情况下,使用Spark指定的聚合函数都比分组然后合并操作更快。

调整并行度

到现在为止,我们讨论的所有转换都是分布式的,但是我们没有真正看到Spark怎么样切分工作。当在RDD上执行操作的时候,每个RDD都有固定的分区数量来决定并行度。

当执行聚合或分组操作时,我们可以让Spark使用指定数量的分区。Spark总是会根据您的群集大小来推测一个合理的默认值,但在某些情况下,您将需要调整并行级别以获得更好的性能。

这章讨论的大多数操作,当创建一个分组的或聚合的RDD时,都接受第二个参数用来指定分区个数

Example 4-15. reduceByKey() with custom parallelism in Python

data = [("a", 3), ("b", 4), ("a", 1)]

sc.parallelize(data).reduceByKey(lambda x, y: x + y) # Default parallelism

sc.parallelize(data).reduceByKey(lambda x, y: x + y, 10) # Custom parallelism

Example 4-16. reduceByKey() with custom parallelism in Scala

val data = Seq(("a", 3), ("b", 4), ("a", 1))

sc.parallelize(data).reduceByKey((x, y) => x + y) // Default parallelism

sc.parallelize(data).reduceByKey((x, y) => x + y) // Custom parallelism

有时候我们想要在分组和聚合操作之外来改变RDD的分区数量,Spark提供了repartition()函数,该函数可以在互联网上混洗数据然后创建一个新的分区集合。要记住的是重新分区数据的操作是非常昂贵的。Spark有一个优化版本coalesce(),当减少RDD分区数量时,避免移动数据。想要知道你是否可以安全的调用coalesce(),可以检查RDD的大小在java/scala中使用rdd.partitions.size()或在Python使用rdd.getNumPartitions(),确保您将它合并到比目前拥有的分区更少的分区上。

继续阅读