1-2 Spark 快速入门
2024-01-04 10:35:29 4 举报
AI智能生成
了解完spark 的概述之后, 首先就快速体验下spark 的入门hello world.
作者其他创作
大纲/内容
快速开始
使用 Spark Shell 进行交互式分析:
基本
有关Dataset操作的更多信息
缓存
独立的应用程序
其他
本教程提供了 Spark 的使用快速介绍。我们将首先通过 Spark 的交互式 shell(Python 或 Scala)介绍 API,然后展示如何使用 Java、Scala 和 Python 编写应用程序。
要按照本指南进行操作,请首先从Spark 网站下载 Spark 的打包版本 。由于我们不会使用 HDFS,因此您可以下载适用于任何版本 Hadoop 的软件包。
需要注意的是,在Spark 2.0之前,Spark的主要编程接口是弹性分布式数据集(RDD)。Spark 2.0 之后,RDD 被 Dataset 取代,Dataset 与 RDD 一样是强类型的,但在底层有更丰富的优化。仍然支持 RDD 接口,您可以在RDD 编程指南中获得更完整的参考。但是,我们强烈建议您改用Dataset,它比RDD具有更好的性能。请参阅SQL 编程指南以获取有关Dataset的更多信息。
要按照本指南进行操作,请首先从Spark 网站下载 Spark 的打包版本 。由于我们不会使用 HDFS,因此您可以下载适用于任何版本 Hadoop 的软件包。
需要注意的是,在Spark 2.0之前,Spark的主要编程接口是弹性分布式数据集(RDD)。Spark 2.0 之后,RDD 被 Dataset 取代,Dataset 与 RDD 一样是强类型的,但在底层有更丰富的优化。仍然支持 RDD 接口,您可以在RDD 编程指南中获得更完整的参考。但是,我们强烈建议您改用Dataset,它比RDD具有更好的性能。请参阅SQL 编程指南以获取有关Dataset的更多信息。
使用 Spark Shell 进行交互式分析
基本
Spark 的 shell 提供了一种学习 API 的简单方法,以及交互式分析数据的强大工具。它可以在 Scala(在 Java VM 上运行,因此是使用现有 Java 库的好方法)或 Python 中使用。通过在 Spark 目录中运行以下命令来启动它:
./bin/spark-shell
Spark 的主要抽象是称为数据集的分布式项目集合。可以从 Hadoop 输入格式(例如 HDFS 文件)或通过转换其他数据集来创建数据集。让我们根据 Spark 源目录中的 README 文件的文本创建一个新的数据集:
scala> val textFile = spark.read.textFile("README.md")
textFile: org.apache.spark.sql.Dataset[String] = [value: string]
您可以通过调用某些操作直接从数据集中获取值,或者转换数据集以获取新的数据集。欲了解更多详细信息,请阅读API 文档。
scala> textFile.count() // Number of items in this Dataset
res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs
scala> textFile.first() // First item in this Dataset
res1: String = # Apache Spark
现在让我们将此数据集转换为新的数据集。我们调用filter返回一个新的数据集,其中包含文件中项目的子集。
scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark: org.apache.spark.sql.Dataset[String] = [value: string]
我们可以将转换和行动链接在一起:
scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"?
res3: Long = 15
./bin/spark-shell
Spark 的主要抽象是称为数据集的分布式项目集合。可以从 Hadoop 输入格式(例如 HDFS 文件)或通过转换其他数据集来创建数据集。让我们根据 Spark 源目录中的 README 文件的文本创建一个新的数据集:
scala> val textFile = spark.read.textFile("README.md")
textFile: org.apache.spark.sql.Dataset[String] = [value: string]
您可以通过调用某些操作直接从数据集中获取值,或者转换数据集以获取新的数据集。欲了解更多详细信息,请阅读API 文档。
scala> textFile.count() // Number of items in this Dataset
res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs
scala> textFile.first() // First item in this Dataset
res1: String = # Apache Spark
现在让我们将此数据集转换为新的数据集。我们调用filter返回一个新的数据集,其中包含文件中项目的子集。
scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark: org.apache.spark.sql.Dataset[String] = [value: string]
我们可以将转换和行动链接在一起:
scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"?
res3: Long = 15
有关Dataset操作的更多信息
数据集操作和转换可用于更复杂的计算。假设我们想要找到单词最多的行:
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
res4: Long = 15
首先将一行映射到一个整数值,创建一个新的数据集。reduce调用该数据集来查找最大字数。map和 的参数reduce是 Scala 函数文字(闭包),并且可以使用任何语言功能或 Scala/Java 库。例如,我们可以轻松调用其他地方声明的函数。我们将使用Math.max()函数来使代码更容易理解:
scala> import java.lang.Math
import java.lang.Math
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
res5: Int = 15
一种常见的数据流模式是由 Hadoop 推广的 MapReduce。Spark 可以轻松实现 MapReduce 流程:
scala> val wordCounts = textFile.flatMap(line => line.split(" ")).groupByKey(identity).count()
wordCounts: org.apache.spark.sql.Dataset[(String, Long)] = [value: string, count(1): bigint]
在这里,我们调用将行flatMap数据集转换为单词数据集,然后组合groupByKey并count计算文件中每个单词的计数作为(字符串,长整型)对的数据集。要在 shell 中收集字数统计,我们可以调用collect:
scala> wordCounts.collect()
res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
res4: Long = 15
首先将一行映射到一个整数值,创建一个新的数据集。reduce调用该数据集来查找最大字数。map和 的参数reduce是 Scala 函数文字(闭包),并且可以使用任何语言功能或 Scala/Java 库。例如,我们可以轻松调用其他地方声明的函数。我们将使用Math.max()函数来使代码更容易理解:
scala> import java.lang.Math
import java.lang.Math
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
res5: Int = 15
一种常见的数据流模式是由 Hadoop 推广的 MapReduce。Spark 可以轻松实现 MapReduce 流程:
scala> val wordCounts = textFile.flatMap(line => line.split(" ")).groupByKey(identity).count()
wordCounts: org.apache.spark.sql.Dataset[(String, Long)] = [value: string, count(1): bigint]
在这里,我们调用将行flatMap数据集转换为单词数据集,然后组合groupByKey并count计算文件中每个单词的计数作为(字符串,长整型)对的数据集。要在 shell 中收集字数统计,我们可以调用collect:
scala> wordCounts.collect()
res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)
说明, identity 是匿名函数, 函数作用是对传入的值原样返回,此处表示key不变。
缓存
Spark 还支持将数据集拉入集群范围的内存缓存中。当重复访问数据时(例如查询小型“热”数据集或运行 PageRank 等迭代算法时),这非常有用。作为一个简单的示例,让我们将linesWithSpark数据集标记为要缓存:
scala> linesWithSpark.cache()
res7: linesWithSpark.type = [value: string]
scala> linesWithSpark.count()
res8: Long = 15
scala> linesWithSpark.count()
res9: Long = 15
使用 Spark 来探索和缓存 100 行文本文件似乎很愚蠢。有趣的是,这些相同的函数可以用于非常大的数据集,即使它们分布在数十或数百个节点上。您还可以通过连接bin/spark-shell到集群以交互方式执行此操作,如RDD 编程指南中所述。
scala> linesWithSpark.cache()
res7: linesWithSpark.type = [value: string]
scala> linesWithSpark.count()
res8: Long = 15
scala> linesWithSpark.count()
res9: Long = 15
使用 Spark 来探索和缓存 100 行文本文件似乎很愚蠢。有趣的是,这些相同的函数可以用于非常大的数据集,即使它们分布在数十或数百个节点上。您还可以通过连接bin/spark-shell到集群以交互方式执行此操作,如RDD 编程指南中所述。
独立的应用程序
假设我们希望使用 Spark API 编写一个独立的应用程序。我们将演练一个使用 Scala(使用 sbt)、Java(使用 Maven)和 Python(pip)的简单应用程序。
我们将在 Scala 中创建一个非常简单的 Spark 应用程序——事实上,它非常简单,因此被命名为SimpleApp.scala:
/* SimpleApp.scala */
import org.apache.spark.sql.SparkSession
object SimpleApp {
def main(args: Array[String]) {
val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system
val spark = SparkSession.builder.appName("Simple Application").getOrCreate()
val logData = spark.read.textFile(logFile).cache()
val numAs = logData.filter(line => line.contains("a")).count()
val numBs = logData.filter(line => line.contains("b")).count()
println(s"Lines with a: $numAs, Lines with b: $numBs")
spark.stop()
}
}
请注意,应用程序应该定义main()方法而不是扩展scala.App。的子类scala.App可能无法正常工作。
该程序仅计算 Spark README 中包含“a”的行数和包含“b”的行数。请注意,您需要将 YOUR_SPARK_HOME 替换为 Spark 的安装位置。与前面使用 Spark shell 的示例不同,Spark shell 会初始化自己的 SparkSession,而我们将 SparkSession 初始化为程序的一部分。
我们调用SparkSession.builder构造一个[[SparkSession]],然后设置应用程序名称,最后调用getOrCreate获取[[SparkSession]]实例。
我们的应用程序依赖于 Spark API,因此我们还将包含一个 sbt 配置文件 , build.sbt它解释了 Spark 是一个依赖项。该文件还添加了 Spark 依赖的存储库:
name := "Simple Project"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.3.0"
为了使 sbt 正常工作,我们需要 根据典型的目录结构进行SimpleApp.scala布局。build.sbt一旦完成,我们就可以创建一个包含应用程序代码的 JAR 包,然后使用该spark-submit脚本来运行我们的程序。
# Your directory layout should look like this
$ find .
.
./build.sbt
./src
./src/main
./src/main/scala
./src/main/scala/SimpleApp.scala
# Package a jar containing your application
$ sbt package
...
[info] Packaging {..}/{..}/target/scala-2.11/simple-project_2.11-1.0.jar
# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
--class "SimpleApp" \
--master local[4] \
target/scala-2.11/simple-project_2.11-1.0.jar
...
Lines with a: 46, Lines with b: 23
我们将在 Scala 中创建一个非常简单的 Spark 应用程序——事实上,它非常简单,因此被命名为SimpleApp.scala:
/* SimpleApp.scala */
import org.apache.spark.sql.SparkSession
object SimpleApp {
def main(args: Array[String]) {
val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system
val spark = SparkSession.builder.appName("Simple Application").getOrCreate()
val logData = spark.read.textFile(logFile).cache()
val numAs = logData.filter(line => line.contains("a")).count()
val numBs = logData.filter(line => line.contains("b")).count()
println(s"Lines with a: $numAs, Lines with b: $numBs")
spark.stop()
}
}
请注意,应用程序应该定义main()方法而不是扩展scala.App。的子类scala.App可能无法正常工作。
该程序仅计算 Spark README 中包含“a”的行数和包含“b”的行数。请注意,您需要将 YOUR_SPARK_HOME 替换为 Spark 的安装位置。与前面使用 Spark shell 的示例不同,Spark shell 会初始化自己的 SparkSession,而我们将 SparkSession 初始化为程序的一部分。
我们调用SparkSession.builder构造一个[[SparkSession]],然后设置应用程序名称,最后调用getOrCreate获取[[SparkSession]]实例。
我们的应用程序依赖于 Spark API,因此我们还将包含一个 sbt 配置文件 , build.sbt它解释了 Spark 是一个依赖项。该文件还添加了 Spark 依赖的存储库:
name := "Simple Project"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.3.0"
为了使 sbt 正常工作,我们需要 根据典型的目录结构进行SimpleApp.scala布局。build.sbt一旦完成,我们就可以创建一个包含应用程序代码的 JAR 包,然后使用该spark-submit脚本来运行我们的程序。
# Your directory layout should look like this
$ find .
.
./build.sbt
./src
./src/main
./src/main/scala
./src/main/scala/SimpleApp.scala
# Package a jar containing your application
$ sbt package
...
[info] Packaging {..}/{..}/target/scala-2.11/simple-project_2.11-1.0.jar
# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
--class "SimpleApp" \
--master local[4] \
target/scala-2.11/simple-project_2.11-1.0.jar
...
Lines with a: 46, Lines with b: 23
其他
恭喜您运行您的第一个 Spark 应用程序!
要深入了解 API,请从RDD 编程指南和SQL 编程指南开始,或者参阅其他组件的“编程指南”菜单。
要在集群上运行应用程序,请参阅部署概述。
最后,Spark 在目录中包含了几个示例examples(Scala、 Java、 Python、 R)。您可以按如下方式运行它们:
# For Scala and Java, use run-example:
./bin/run-example SparkPi
# For Python examples, use spark-submit directly:
./bin/spark-submit examples/src/main/python/pi.py
# For R examples, use spark-submit directly:
./bin/spark-submit examples/src/main/r/dataframe.R
要深入了解 API,请从RDD 编程指南和SQL 编程指南开始,或者参阅其他组件的“编程指南”菜单。
要在集群上运行应用程序,请参阅部署概述。
最后,Spark 在目录中包含了几个示例examples(Scala、 Java、 Python、 R)。您可以按如下方式运行它们:
# For Scala and Java, use run-example:
./bin/run-example SparkPi
# For Python examples, use spark-submit directly:
./bin/spark-submit examples/src/main/python/pi.py
# For R examples, use spark-submit directly:
./bin/spark-submit examples/src/main/r/dataframe.R
0 条评论
下一页