Spark RDD基础
2020-03-13 18:54:40    43    0    0
yuziyue

RDD基础

    Spark 中的 RDD 就是一个不可变的分布式对象集合。每个 RDD 都被分为多个分区,这些分区运行在集群中的不同节点上。RDD 可以包含 Python、Java、Scala 中任意类型的对象,甚至可以包含用户自定义的对象。用户可以使用两种方法创建 RDD:读取一个外部数据集,或在驱动器程序里分发驱动器程序中的对象集合(比如 list 和 set)。

val lines = sc.textFile("/etc/profile")

// 转化操作
val startsWithExport = lines.filter(_.startsWith("export"))

// 行动操作
val firstLine = lines.first

// 持久化到内存
lines.persist()

    创建出来后,RDD 支持两种类型的操作:转化操作(transformation)和行动操作(action)。转化操作会由一个 RDD 生成一个新的 RDD,比如这里的startsWithExport,另一方面,行动操作会对 RDD 计算出一个结果,并把结果返回到驱动器程序中,或把结果存储到外部存储系统(如 HDFS)中。


    转化操作和行动操作的区别在于 Spark 计算 RDD 的方式不同。虽然你可以在任何时候定义新的 RDD,但 Spark 只会惰性计算这些 RDD。它们只有第一次在一个行动操作中用到时,才会真正计算。这种策略刚开始看起来可能会显得有些奇怪,不过在大数据领域是很有道理的。


    最后,默认情况下,Spark 的 RDD 会在你每次对它们进行行动操作时重新计算。如果想在多个行动操作中重用同一个 RDD,可以使用 RDD.persist() 让 Spark 把这个 RDD 缓存下来。在实际操作中,你会经常用 persist() 来把数据的一部分读取到内存中,并反复查询这部分数据。


创建RDD

Spark 提供了两种创建 RDD 的方式:读取外部数据集,以及在驱动器程序中对一个集合进行并行化。

// 在驱动器程序中对一个集合进行并行化创建RDD
val lines = sc.parallelize(List("pandas", "i like pandas"))

// 从外部存储中读取数据来创建 RDD。
val lines = sc.textFile("/path/to/README.md")


    如果对于一个特定的函数是属于转化操作还是行动操作感到困惑,你可以看看它的返回值类型:转化操作返回的是 RDD,而行动操作返回的是其他的数据类型。


RDD操作

    转化操作:RDD 的转化操作是返回新 RDD 的操作。通过转化操作,你从已有的 RDD 中派生出新的 RDD,Spark 会使用谱系图(lineage graph)来记录这些不同 RDD 之间的依赖关系。Spark 需要用这些信息来按需计算每个 RDD,也可以依靠谱系图在持久化的 RDD 丢失部分数据时恢复所丢失的数据。

    行动操作:行动操作是第二种类型的 RDD 操作,它们会把最终求得的结果返回到驱动器程序,或者写入外部存储系统中。由于行动操作需要生成实际的输出,它们会强制执行那些求值必须用到的 RDD 的转化操作。

// 只取前十行数据
lines.take(10).foreach(println)

    RDD 还有一个 collect() 函数,可以用来获取整个 RDD 中的数据。如果你的程序把 RDD 筛选到一个很小的规模,并且你想在本地处理这些数据时,就可以使用它。记住,只有当你的整个数据集能在单台机器的内存中放得下时,才能使用 collect(),因此,collect() 不能用在大规模数据集上。


    在大多数情况下,RDD 不能通过 collect() 收集到驱动器进程中,因为它们一般都很大。此时,我们通常要把数据写到诸如 HDFS 或 Amazon S3 这样的分布式的存储系统中。你可以使用 saveAsTextFile()、saveAsSequenceFile(),或者任意的其他行动操作来把 RDD 的数据内容以各种自带的格式保存起来。


惰性求值

    惰性求值意味着当我们对 RDD 调用转化操作(例如调用 map())时,操作不会立即执行。相反,Spark 会在内部记录下所要求执行的操作的相关信息。我们不应该把 RDD 看作存放着特定数据的数据集,而最好把每个 RDD 当作我们通过转化操作构建出来的、记录如何计算数据的指令列表。把数据读取到 RDD 的操作也同样是惰性的。因此,当我们调用 sc.textFile() 时,数据并没有读取进来,而是在必要时才会读取。和转化操作一样的是,读取数据的操作也有可能会多次执行。


    虽然转化操作是惰性求值的,但还是可以随时通过运行一个行动操作来强制 Spark 执行 RDD 的转化操作,比如使用 count()。这是一种对你所写的程序进行部分测试的简单方法。


向spark传递函数

传递一个对象的方法或者字段时,会包含对整个对象的引用。所以需要将具体的字段摘出来,防止将整个对象传递进入。

// 错误
class SearchFunctions(val query:String) {
 def isMatch(s:String):Boolean = {
  s.contains(query)
 }
 def getMatchesFunctionReference(rdd:RDD[String]):RDD[String] = {
  rdd.map(isMatch)
 }
}

// 正确
class SearchFunctions(val query:String) {
 def isMatch(s:String):Boolean = {
  s.contains(query)
 }
 def getMatchesFunctionReference(rdd:RDD[String]):RDD[String] = {
   val new = this.isMatch
  rdd.map(new)
 }
}


flatMap

    有时候,我们希望对每个输入元素生成多个输出元素。实现该功能的操作叫作 flatMap()。和 map() 类似,我们提供给 flatMap() 的函数被分别应用到了输入 RDD 的每个元素上。不过返回的不是一个元素,而是一个返回值序列的迭代器。输出的 RDD 倒不是由迭代器组成的。我们得到的是一个包含各个迭代器可访问的所有元素的 RDD。flatMap() 的一个简单用途是把输入的字符串切分为单词,

// map
scala> sc.parallelize(List("hello world", "hi")).map(x => x.split(" ")).collect
res0: Array[Array[String]] = Array(Array(hello, world), Array(hi))

// flatMap
scala> sc.parallelize(List("hello world", "hi")).flatMap(x => x.split(" ")).collect
res1: Array[String] = Array(hello, world, hi)

你可以把 flatMap() 看作将返回的迭代器“拍扁”,这样就得到了一个由各列表中的元素组成的 RDD,而不是一个由列表组成的 RDD


伪集合操作

    尽管 RDD 本身不是严格意义上的集合,但它也支持许多数学上的集合操作,比如合并和相交操作。图3-4 展示了四种操作。注意,这些操作都要求操作的 RDD 是相同数据类型的。

  • RDD.distinct() 转化操作来生成一个只包含不同元素的新 RDD。不过需要注意,distinct() 操作的开销很大,因为它需要将所有数据通过网络进行混洗(shuffle),以确保每个元素都只有一份。
  • union(other)会返回一个包含两个 RDD 中所有元素的 RDD,两个RDD相同的元素不会去重。
  • intersection(other) 方法,只返回两个 RDD 中都有的元素。intersection() 在运行时也会去掉所有重复的元素(单个 RDD 内的重复元素也会一起移除)。尽管 intersection() 与 union() 的概念相似,intersection() 的性能却要差很多,因为它需要通过网络混洗数据来发现共有的元素。
  • subtract(other) 函数接收另一个RDD作为参数,返回一个由只存在于第一个 RDD 中而不存在于第二个 RDD 中的所有元素组成的 RDD。和 intersection() 一样,它也需要数据混洗。
  • cartesian(other) 转化操作会返回所有可能的 (a, b) 对,其中 a 是源 RDD 中的元素,而 b 则来自另一个 RDD。笛卡儿积在我们希望考虑所有可能的组合的相似度时比较有用,比如计算各用户对各种产品的预期兴趣程度。我们也可以求一个 RDD 与其自身的笛卡儿积,这可以用于求用户相似度的应用中。不过要特别注意的是,求大规模 RDD 的笛卡儿积开销巨大。


reduce和fold

// 相当于 1 + 2 + 3 + 4
scala> sc.parallelize(List(1,2,3,4)).reduce((x, y) => x+y)
res2: Int = 10

// 相当于0 + 1 + 2 + 3 + 4这里的初始值不能是其他值,根据元素类型和操作类型值来确定
// 例如 + 对应的 0,* 对应的 1,或拼接操作对应的空列表
scala> sc.parallelize(List(1,2,3,4)).fold(0)((x, y) => x + y)
res12: Int = 10
scala> sc.parallelize(List(1,2,3,4)).fold(1)((x, y) => x * y)
res20: Int = 24


aggregate

reducefold,这两个函数要求它们的返回值必须与rdd的数据类型相同,而aggregate函数则可以自定义它的返回值类型。比如下面要计算列表的平均值。它的整体调用参数是这样的:aggregate(0, 0)(seqOp, combOp)

val a = sc.parallelize(List(1,2,3,4))
val b = a.aggregate(0, 0)((x, y) => (x._1 + y, x._2 + 1), (x, y) => (x._1 + y._1, x._2 + y._2))
val c = b._1 / b._2.toFloat
  • (0, 0) 表示初始化值为两个元素的元组,后续的计算都返回这样的格式,第一个零表示元素总和, 第二元素表示元素总个数。
  • seqOp表示计算下一次返回的的结果函数。(x, y) => (x._1 + y, x._2 + 1) :x 表示上一次的计算结果元组,x._1表示上一次的计算结果元组的第一个元素,y 表示计算的当前元素,x._2表示上一次的计算结果元组的第二个元素,
  • combOp含义:因为我们的计算是分布式计算,这个函数是将各个节点的返回值合并。这里的 x 和 y 都是相同类型的,都是两个元素的元组,第一个表示节点计算的元素的和,第二个表示该节点计算的元素的个数。(x, y) => (x._1 + y._1, x._2 + y._2)所有安装对应关系加上即可。
  • b 是最后统计的结果,也是一个二元组。


下面是对一个数据为{1, 2, 3, 3}的RDD进行基本的RDD行动操作


持久化(缓存)

    如前所述,Spark RDD 是惰性求值的,而有时我们希望能多次使用同一个 RDD。如果简单地对 RDD 调用行动操作,Spark 每次都会重算 RDD 以及它的所有依赖。这在迭代算法中消耗格外大,因为迭代算法常常会多次使用同一组数据。


    为了避免多次计算同一个 RDD,可以让 Spark 对数据进行持久化。当我们让 Spark 持久化存储一个 RDD 时,计算出 RDD 的节点会分别保存它们所求出的分区数据。如果一个有持久化数据的节点发生故障,Spark 会在需要用到缓存的数据时重算丢失的数据分区。如果希望节点故障的情况不会拖累我们的执行速度,也可以把数据备份到多个节点上。默认情况下 persist() 会把数据以序列化的形式缓存在 JVM 的堆空间中。

import org.apache.spark.storage.StorageLevel

// 如有必要,可以通过在存储级别的末尾加上“_2”来把持久化数据存为两份
val result = sc.parallelize(List(1,2,3,4)).map(x => x * x)
result.persist(StorageLevel.DISK_ONLY)
println(result.count())

    注意,我们在第一次对这个 RDD 调用行动操作前就调用了 persist() 方法。persist() 调用本身不会触发强制求值。持久化的是实际的计算逻辑。


    如果要缓存的数据太多,内存中放不下,Spark 会自动利用最近最少使用(LRU)的缓存策略把最老的分区从内存中移除。对于仅把数据存放在内存中的缓存级别,下一次要用到已经被移除的分区时,这些分区就需要重新计算。但是对于使用内存与磁盘的缓存级别的分区来说,被移除的分区都会写入磁盘。不论哪一种情况,都不必担心你的作业因为缓存了太多数据而被打断。不过,缓存不必要的数据会导致有用的数据被移出内存,带来更多重算的时间开销。


    最后,RDD 还有一个方法叫作 unpersist(),调用该方法可以手动把持久化的 RDD 从缓存中移除。下面是常用的持久化级别表格。

上一篇: Scala中下划线用法总结

下一篇: Spark快速上手

43 人读过
文档导航