Spark键值对操作
2020-03-14 22:53:12    82    0    0
yuziyue

    Spark 为包含键值对类型的 RDD 提供了一些专有的操作。这些 RDD 被称为 pair RDD。Pair RDD 是很多程序的构成要素,因为它们提供了并行操作各个键或跨节点重新进行数据分组的操作接口。

 

一. 创建Pair RDD

    在 Spark 中有很多种创建 pair RDD 的方式。很多存储键值对的数据格式会在读取时直接返回由其键值对数据组成的 pair RDD。此外,当需要把一个普通的 RDD 转为 pair RDD 时,可以调用 map() 函数来实现,传递的函数需要返回键值对。

 

    在 Scala 中,为了让提取键之后的数据能够在函数中使用,同样需要返回二元组

// 使用第一个单词作为键创建出一个 pair RDD
val pairs = lines.map(x => (x.split(" ")(0), x))

 

二. Pair RDD的转化操作

    Pair RDD 也还是 RDD(元素为 Java 或 Scala 中的 Tuple2 对象或 Python 中的元组),因此同样支持 RDD 所支持的函数。

 

    当数据集以键值对形式组织的时候,聚合具有相同键的元素进行一些统计是很常见的操作。之前讲解过基础 RDD 上的 fold()、combine()、reduce() 等行动操作,pair RDD 上则有相应的针对键的转化操作。Spark 有一组类似的操作,可以组合具有相同键的值。这些操作返回 RDD,因此它们是转化操作而不是行动操作。

 

2.1 mapValues

    mapValues()对pari RDD中的每一个值应用一个函数而不改变键

val d = sc.parallelize(List("beijing", "shanghai")).map(x => (x, 1))
d.mapValues(x => x+1).collect
res37: Array[(String, Int)] = Array((beijing,2), (shanghai,2))


2.2 countByKey

根据key统计,统计目标是键值对的类型。

scala> sc.parallelize(List(("A", 10), ("B", 20), ("A", 30), ("C", 20))).countByKey
res46: scala.collection.Map[String,Long] = Map(A -> 2, B -> 1, C -> 1)


2.3 countByValue

统计出现的次数,统计目标是单一的元素。

scala> sc.parallelize(List("A", "B", "C", "B", "A")).countByValue
res0: scala.collection.Map[String,Long] = Map(A -> 2, B -> 2, C -> 1)


2.4 reduceByKey

    reduceByKey() 与 reduce() 相当类似;它们都接收一个函数,并使用该函数对值进行合并。reduceByKey() 会为数据集中的每个键进行并行的归约操作,每个归约操作会将键相同的值合并起来。因为数据集中可能有大量的键,所以 reduceByKey() 没有被实现为向用户程序返回一个值的行动操作。实际上,它会返回一个由各键和对应键归约出来的结果值组成的新的 RDD。

// 使用reduceByKey()和mapValues()计算每个键对应值的平均值
rdd.mapValues(x => (x, 1)).reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))

// 单词计数
sc.textFile("/etc/profile").flatMap(x => x.split(" ")).map(x => (x, 1)).reduceByKey((x, y) => x + y).foreach(println)

// 单词计数,直接使用countByValue
sc.textFile("/etc/profile").flatMap(x => x.split(" ")).countByValue().foreach(println)

 

2.5 combineByKey

    combineByKey() 是最为常用的基于键进行聚合的函数。大多数基于键聚合的函数都是用它实现的。和 aggregate() 一样,combineByKey() 可以让用户返回与输入数据的类型不同的返回值。

// combineByKey 函数的定义如下,我们需要关注的主要是前三个参数
def combineByKey[C](
      createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C,
      partitioner: Partitioner,
      mapSideCombine: Boolean = true,
      serializer: Serializer = null
      )
  • createCombiner: V => C ,这个函数把当前的值作为参数,此时我们可以对其做些附加操作(类型转换)并把它返回 (这一步类似于初始化操作)
  • mergeValue: (C, V) => C ,该函数把元素V合并到之前的元素C(createCombiner)上 (这个操作在每个分区内进行)
  • mergeCombiners: (C, C) => C,该函数把2个元素C合并 (这个操作在不同分区间进行)

下面使用combineByKey来求解平均数的例子:每个科目的平均成绩

val data = sc.parallelize(Array(("Fred", 88.0), ("Fred", 95.0), ("Fred", 91.0), ("Wilma", 93.0), ("Wilma", 95.0), ("Wilma", 98.0)))
type MVType = (Int, Double)
data.combineByKey(
  score => (1, score),
  (c1: MVType, newScore) => (c1._1 + 1, c1._2 + newScore),
  (c1: MVType, c2: MVType) => (c1._1 + c2._1, c1._2 + c2._2)
).map { case (name, (num, socre)) => (name, socre / num) }.collect
  • MVType定义一个元组类型(当前科目出现了多少次,当前科目目前的总分数)
  • score => (1, score),我们把分数作为参数,并返回了附加的元组类型。 以"Fred"为列,当前其分数为88.0 =>(1,88.0)  1表示当前科目的计数器,此时只有一个科目
  • (c1: MVType, newScore) => (c1._1 + 1, c1._2 + newScore),注意这里的c1就是createCombiner初始化得到的(1,88.0)。在一个分区内,我们又碰到了"Fred"的一个新的分数91.0。当然我们要把之前的科目分数和当前的分数加起来即c1._2 + newScore,然后把科目计算器加1即c1._1 + 1
  • (c1: MVType, c2: MVType) => (c1._1 + c2._1, c1._2 + c2._2),注意"Fred"可能是个学霸,他选修的科目可能过多而分散在不同的分区中。所有的分区都进行mergeValue后,接下来就是对分区间进行合并了,分区间科目数和科目数相加分数和分数相加就得到了总分和总科目数

 

2.6 并行度调优

    每个 RDD 都有固定数目的分区,分区数决定了在 RDD 上执行操作时的并行度。
在执行聚合或分组操作时,可以要求 Spark 使用给定的分区数。Spark 始终尝试根据集群的大小推断出一个有意义的默认值,但是有时候你可能要对并行度进行调优来获取更好的性能表现。

// 自定义分区数
scala> sc.parallelize(data, 2).reduceByKey((x, y) => x + y, 2)

// 默认分区数
scala> sc.parallelize(data).reduceByKey((x, y) => x + y)

// 查看分区数
scala> sc.parallelize(data).reduceByKey((x, y) => x + y, 2).partitions.size
res12: Int = 2

// 改变分区数方法一
scala> sc.parallelize(data).reduceByKey((x, y) => x + y, 2).repartition(3).partitions.size
res14: Int = 3

// 改变分区数方法二
scala> sc.parallelize(data).reduceByKey((x, y) => x + y, 4).coalesce(2).partitions.size
res21: Int = 2

    有时,我们希望在除分组操作和聚合操作之外的操作中也能改变 RDD 的分区。对于这样的情况,Spark 提供了 repartition() 函数。它会把数据通过网络进行混洗,并创建出新的分区集合。切记,对数据进行重新分区是代价相对比较大的操作。Spark 中也有一个优化版的 repartition(),叫作 coalesce()

 

2.7 数据分组groupBy

根据传入的函数进行分组

val a = sc.parallelize(1 to 9)
a.groupBy(x => if (x % 2 == 0) "even" else "old").collect
Array[(String, Iterable[Int])] = Array(
    (even,CompactBuffer(2, 4, 6, 8)),
    (old,CompactBuffer(1, 3, 5, 7, 9)))

val a = sc.parallelize(1 to 9)
a.groupBy(x => if(0<x && x<=3) "a" else if(3<x && x<=6) "b" else "c").collect
Array[(String, Iterable[Int])] = Array(
    (a,CompactBuffer(1, 2, 3)), 
    (b,CompactBuffer(4, 5, 6)), 
    (c,CompactBuffer(7, 8, 9)))


2.8 数据分组groupByKey

对Key-Value形式的RDD的操作。与groupBy类似。但是其分组所用的key不是由指定的函数生成的,而是采用元素本身中的key。

val a = sc.parallelize(List("dog", "tiger", "cat", "spider", "eagle")).keyBy(_.length)
a.collect
Array[(Int, String)] = Array((3,dog), (5,tiger), (3,cat), (6,spider), (5,eagle))

a.groupByKey.collect
Array[(Int, Iterable[String])] = Array(
    (3,CompactBuffer(dog, cat)),
    (5,CompactBuffer(tiger, eagle)),
    (6,CompactBuffer(spider)))

a.groupByKey.map(x => (x._1, x._2.mkString("-"))).collect
Array[(Int, String)] = Array((3,dog-cat), (5,tiger-eagle), (6,spider))

 

 注意:rdd.reduceByKey(func)rdd.groupByKey().mapValues(value => value.reduce(func)) 等价,但是前者更为高效,因为它避免了为每个键创建存放值的列表的步骤。

 

2.9 内连接join

Join类似于SQL的inner join操作,返回结果是前面和后面集合中配对成功的,过滤掉关联不上的。

val rdd1 = sc.makeRDD(Array(("1","Spark"),("2","Hadoop"),("3","Scala"),("4","Java")))
val rdd2 = sc.makeRDD(Array(("1","30K"),("2","15K"),("3","25K"),("5","10K")))

scala> rdd1.join(rdd2).collect.foreach(println)
(1,(Spark,30K))
(2,(Hadoop,15K))
(3,(Scala,25K))

2.10 左外连接leftOuterJoin

leftOuterJoin类似于SQL中的左外关联left outer join,返回结果以前面的RDD为主,关联不上的记录为空。

val rdd1 = sc.makeRDD(Array(("1","Spark"),("2","Hadoop"),("3","Scala"),("4","Java")))
val rdd2 = sc.makeRDD(Array(("1","30K"),("2","15K"),("3","25K"),("5","10K")))

scala> rdd1.leftOuterJoin(rdd2).collect.foreach(println)
(1,(Spark,Some(30K)))
(2,(Hadoop,Some(15K)))
(3,(Scala,Some(25K)))
(4,(Java,None))

2.11 右外连接rightOuterJoin

rightOuterJoin类似于SQL中的有外关联right outer join,返回结果以参数也就是右边的RDD为主,关联不上的记录为空

val rdd1 = sc.makeRDD(Array(("1","Spark"),("2","Hadoop"),("3","Scala"),("4","Java")))
val rdd2 = sc.makeRDD(Array(("1","30K"),("2","15K"),("3","25K"),("5","10K")))

scala> rdd1.rightOuterJoin(rdd2).collect.foreach(println)
(1,(Some(Spark),30K))
(2,(Some(Hadoop),15K))
(3,(Some(Scala),25K))
(5,(None,10K))

 

2.12 排序sortBy和sortByKey

// 根据给定函数排序:value排序
val rdd1 = sc.makeRDD(Array((1,"Spark"),(2,"Hadoop"),(3,"Scala"),(4,"Java")))
rdd1.sortBy(_._2).collect
Array[(Int, String)] = Array((2,Hadoop), (4,Java), (3,Scala), (1,Spark))

// 根据key排序
val rdd1 = sc.makeRDD(Array((1,"Spark"),(2,"Hadoop"),(3,"Scala"),(4,"Java")))
rdd1.sortByKey(ascending=false).collect
Array[(Int, String)] = Array((4,Java), (3,Scala), (2,Hadoop), (1,Spark))

  

三. Pari RDD的行动操作

 下表是Pair RDD的部分行动操作(以键值对集合{(1, 2), (3, 4), (3, 6)}为例)

 

 

四. 数据分区

     在分布式程序中,通信的代价是很大的,因此控制数据分布以获得最少的网络传输可以极大地提升整体性能。和单节点的程序需要为记录集合选择合适的数据结构一样,Spark 程序可以通过控制 RDD 分区方式来减少通信开销。


     Spark 中所有的键值对 RDD 都可以进行分区。系统会根据一个针对键的函数对元素进行分组。尽管 Spark 没有给出显示控制每个键具体落在哪一个工作节点上的方法(部分原因是 Spark 即使在某些节点失败时依然可以工作),但 Spark 可以确保同一组的键出现在同一个节点上。比如,你可能使用哈希分区将一个 RDD 分成了 100 个分区,此时键的哈希值对 100 取模的结果相同的记录会被放在一个节点上。你也可以使用范围分区法,将键在同一个范围区间内的记录都放在同一个节点上。

 

    举个简单的例子,我们分析这样一个应用,它在内存中保存着一张很大的用户信息表——也就是一个由 (UserID, UserInfo) 对组成的 RDD,其中 UserInfo 包含一个该用户所订阅的主题的列表。该应用会周期性地将这张表与一个小文件进行组合,这个小文件中存着过去五分钟内发生的事件——其实就是一个由 (UserID, LinkInfo) 对组成的表,存放着过去五分钟内某网站各用户的访问情况。例如,我们可能需要对用户访问其未订阅主题的页面的情况进行统计。我们可以使用 Spark 的 join() 操作来实现这个组合操作,其中需要把 UserInfo 和 LinkInfo 的有序对根据 UserID 进行分组。


import org.apache.spark.HashPartitioner

scala> val pairs = sc.parallelize(List((1, 1), (2, 2), (3, 3)))
scala> pairs.partitioner
res97: Option[org.apache.spark.Partitioner] = None

scala> val partitioned = pairs.partitionBy(new HashPartitioner(2)).persist
scala> partitioned.partitioner
res99: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@2)

    在这段简短的代码中,我们创建出了一个由 (Int, Int) 对组成的 RDD,初始时没有分区方式信息(一个值为 None 的 Option 对象)。然后通过对第一个 RDD 进行哈希分区,创建出了第二个 RDD。如果确实要在后续操作中使用 partitioned,那就应当在定义 partitioned 时,在第三行输入的最后加上 persist()。这和之前的例子中需要对 userData 调用 persist() 的原因是一样的:如果不调用 persist() 的话,后续的 RDD 操作会对 partitioned 的整个谱系重新求值,这会导致对 pairs 一遍又一遍地进行哈希分区操作。

 

  

上一篇: Ubuntu离线仓库制作

下一篇: Scala中下划线用法总结

82 人读过
文档导航