Spark快速上手
2020-03-13 13:45:41    31    0    0
yuziyue

基本示例

在 Spark 中,通过对分布式数据集的操作来表达我们的计算意图,这些计算会自动地在集群上并行计算。这样的数据集被称为弹性分布式数据集(resilient distributed dataset),简称 RDD。RDD 是 Spark 对分布式数据和计算的基本抽象。

tar -zxf spark-2.4.5-bin-hadoop2.6.tgz
cd spark-2.4.5-bin-hadoop2.6
./bin/spark-shell
scala> val lines = sc.textFile("README.md")
lines: org.apache.spark.rdd.RDD[String] = README.md MapPartitionsRDD[1] at textFile at <console>:24

scala> lines.count
res0: Long = 104                                                                

scala> lines.first
res1: String = # Apache Spark

 

驱动器程序

从上层来看,每个 Spark 应用都由一个驱动器程序(driver program)来发起集群上的各种并行操作。驱动器程序包含应用的 main 函数,并且定义了集群上的分布式数据集,还对这些分布式数据集应用了相关操作。在spark-shell里面,实际的驱动器程序就是 Spark shell 本身,你只需要输入想要运行的操作就可以了。

 

驱动器程序通过一个 SparkContext 对象来访问 Spark。这个对象代表对计算集群的一个连接。shell 启动时已经自动创建了一个 SparkContext 对象,是一个叫作 sc 的变量。

scala> sc
res2: org.apache.spark.SparkContext = org.apache.spark.SparkContext@7882379b

 

一旦有了 SparkContext,你就可以用它来创建 RDD,要执行这些操作,驱动器程序一般要管理多个执行器(executor)节点。比如,如果我们在集群上运行 count() 操作,那么不同的节点会统计文件的不同部分的行数。由于我们刚才是在本地模式下运行 Spark shell,因此所有的工作会在单个节点上执行,但你可以将这个 shell 连接到集群上来进行并行的数据分析。

 

 

独立应用

除了交互式运行之外,Spark 也可以在 Java、Scala 或 Python 的独立程序中被连接使用。这与在 shell 中使用的主要区别在于你需要自行初始化 SparkContext。

 

初始化SparkContext

你可以通过先创建一个 SparkConf 对象来配置你的应用,然后基于这个 SparkConf 创建一个 SparkContext 对象。在初始化 SparkContext 之后,你可以使用我们前面展示的所有方法(比如利用文本文件)来创建 RDD 并操控它们。最后,关闭 Spark 可以调用 SparkContext 的 stop() 方法,或者直接退出应用,比如System.exit(0)

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
val conf = new SparkConf().setMaster("local").setAppName("MyApp")
val sc = new SparkContext(conf)

只需传递两个参数:

  • 集群 URL:告诉 Spark 如何连接到集群上。在这几个例子中我们使用的是 local,这个特殊值可以让 Spark 运行在单机单线程上而无需连接到集群。
  • 应用名:在例子中我们使用的是 MyApp。当连接到一个集群时,这个值可以帮助你在集群管理器的用户界面中找到你的应用。

 

WordCount程序

使用intellij idea创建maven程序

package com.yuchaoshui.note

import org.apache.spark.{SparkConf, SparkContext}

object Main {
  def main(args: Array[String]) = {
    val conf = new SparkConf().setAppName("wordCount").setMaster("local")
    val sc = new SparkContext(conf)
    sc.textFile("/etc/profile")                  // SparkContext读入纯文本。
      .flatMap(line => line.split(" "))          // 按照空格拆分成词。
      .map(word => (word, 1))                    // 将每个词映射成(word,1),这样就形成了key vale键值对。
      .reduceByKey((pre, after) => pre + after)  // reduceByKey的作用域是key-value类型的键值对,只对每个key的value进行处理。
      .sortBy(_._2, ascending=false)             // 升序排序 _._表示2元组tuple对象,后面的数字2表示取第几个数作为排序依据。
      .foreach(println)                          // 打印结果
    println("done!")
  }
}


 

 

上一篇: Spark RDD基础

下一篇: Scala编程第3版第七章(函数)

31 人读过
文档导航