两种连接spark的方式
// 方法1:使用 SparkContext val conf = new SparkConf().setAppName("MyApp").setMaster("local") val sc = new SparkContext(conf) val input = sc.textFile("file:///etc/profile") input.foreach(println) // 方法2:使用 SparkSession val spark = SparkSession.builder().appName("MyApp").getOrCreate() val profile = spark.read.format("text").load("/user/work/yzy/debug/debug_default_2021_01_07_14.log") profile.show()
基本示例
在 Spark 中,通过对分布式数据集的操作来表达我们的计算意图,这些计算会自动地在集群上并行计算。这样的数据集被称为弹性分布式数据集(resilient distributed dataset),简称 RDD。RDD 是 Spark 对分布式数据和计算的基本抽象。
tar -zxf spark-2.4.5-bin-hadoop2.6.tgz cd spark-2.4.5-bin-hadoop2.6 ./bin/spark-shell scala> val lines = sc.textFile("README.md") lines: org.apache.spark.rdd.RDD[String] = README.md MapPartitionsRDD[1] at textFile at <console>:24 scala> lines.count res0: Long = 104 scala> lines.first res1: String = # Apache Spark
驱动器程序
从上层来看,每个 Spark 应用都由一个驱动器程序(driver program)来发起集群上的各种并行操作。驱动器程序包含应用的 main 函数,并且定义了集群上的分布式数据集,还对这些分布式数据集应用了相关操作。在spark-shell里面,实际的驱动器程序就是 Spark shell 本身,你只需要输入想要运行的操作就可以了。
驱动器程序通过一个 SparkContext 对象来访问 Spark。这个对象代表对计算集群的一个连接。shell 启动时已经自动创建了一个 SparkContext 对象,是一个叫作 sc 的变量。
scala> sc res2: org.apache.spark.SparkContext = org.apache.spark.SparkContext@7882379b
一旦有了 SparkContext,你就可以用它来创建 RDD,要执行这些操作,驱动器程序一般要管理多个执行器(executor)节点。比如,如果我们在集群上运行 count() 操作,那么不同的节点会统计文件的不同部分的行数。由于我们刚才是在本地模式下运行 Spark shell,因此所有的工作会在单个节点上执行,但你可以将这个 shell 连接到集群上来进行并行的数据分析。
独立应用
除了交互式运行之外,Spark 也可以在 Java、Scala 或 Python 的独立程序中被连接使用。这与在 shell 中使用的主要区别在于你需要自行初始化 SparkContext。
初始化SparkContext
你可以通过先创建一个 SparkConf 对象来配置你的应用,然后基于这个 SparkConf 创建一个 SparkContext 对象。在初始化 SparkContext 之后,你可以使用我们前面展示的所有方法(比如利用文本文件)来创建 RDD 并操控它们。最后,关闭 Spark 可以调用 SparkContext 的 stop() 方法,或者直接退出应用,比如System.exit(0)
import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ val conf = new SparkConf().setMaster("local").setAppName("MyApp") val sc = new SparkContext(conf)
只需传递两个参数:
- 集群 URL:告诉 Spark 如何连接到集群上。在这几个例子中我们使用的是 local,这个特殊值可以让 Spark 运行在单机单线程上而无需连接到集群。
- 应用名:在例子中我们使用的是 MyApp。当连接到一个集群时,这个值可以帮助你在集群管理器的用户界面中找到你的应用。
WordCount程序
使用intellij idea创建maven程序
package com.yuchaoshui.note import org.apache.spark.{SparkConf, SparkContext} object Main { def main(args: Array[String]) = { val conf = new SparkConf().setAppName("wordCount").setMaster("local") val sc = new SparkContext(conf) sc.textFile("/etc/profile") // SparkContext读入纯文本。 .flatMap(line => line.split(" ")) // 按照空格拆分成词。 .map(word => (word, 1)) // 将每个词映射成(word,1),这样就形成了key vale键值对。 .reduceByKey((pre, after) => pre + after) // reduceByKey的作用域是key-value类型的键值对,只对每个key的value进行处理。 .sortBy(_._2, ascending=false) // 升序排序 _._表示2元组tuple对象,后面的数字2表示取第几个数作为排序依据。 .foreach(println) // 打印结果 println("done!") } }