1-3 Spark RDD 编程指南
2024-01-12 10:56:36 0 举报
AI智能生成
Spark 官网 RDD编程指南思维笔记。
作者其他创作
大纲/内容
目录
概述
与 Spark 链接
初始化 Spark
使用Shell
弹性分布式数据集 (RDD)
并行集合
外部数据集
RDD操作
基本
将函数传递给 Spark
了解闭包
例子
本地模式与集群模式
打印 RDD 的元素
使用键值对
Transformations
Actions
Shuffle operations
背景
性能影响
RDD持久化
选择哪个存储级别?
删除数据
共享变量
广播变量
累加器
部署到集群
从 Java/Scala 启动 Spark 作业
单元测试
其他
概述
从较高的层面来看,每个 Spark 应用程序都包含一个驱动程序,该程序运行用户的函数并在集群上main执行各种并行操作。Spark 提供的主要抽象是弹性分布式数据集(RDD),它是跨集群节点分区的元素集合,可以并行操作。RDD 是通过从 Hadoop 文件系统(或任何其他 Hadoop 支持的文件系统)中的文件或驱动程序中现有的 Scala 集合开始,然后对其进行转换来创建的。用户还可以要求 Spark 将RDD持久保存在内存中,从而使其能够在并行操作中高效地重用。最后,RDD 会自动从节点故障中恢复。
Spark 中的第二个抽象是可在并行操作中使用的共享变量。默认情况下,当 Spark 将函数作为一组任务在不同节点上并行运行时,它会将函数中使用的每个变量的副本发送给每个任务。有时,需要在任务之间或任务和驱动程序之间共享变量。Spark 支持两种类型的共享变量:广播变量(可用于在所有节点上的内存中缓存值)和累加器(仅“添加”变量,例如计数器和总和)。
本指南以 Spark 支持的每种语言展示了其中的每一项功能。bin/spark-shell如果您启动 Spark 的交互式 shell(无论是 Scala shell 还是 bin/pysparkPython shell),那么最容易遵循。
Spark 中的第二个抽象是可在并行操作中使用的共享变量。默认情况下,当 Spark 将函数作为一组任务在不同节点上并行运行时,它会将函数中使用的每个变量的副本发送给每个任务。有时,需要在任务之间或任务和驱动程序之间共享变量。Spark 支持两种类型的共享变量:广播变量(可用于在所有节点上的内存中缓存值)和累加器(仅“添加”变量,例如计数器和总和)。
本指南以 Spark 支持的每种语言展示了其中的每一项功能。bin/spark-shell如果您启动 Spark 的交互式 shell(无论是 Scala shell 还是 bin/pysparkPython shell),那么最容易遵循。
与 Spark 链接
Spark 2.3.0 的构建和分发默认与 Scala 2.11 配合使用。(Spark 也可以与其他版本的 Scala 一起使用。)要在 Scala 中编写应用程序,您需要使用兼容的 Scala 版本(例如 2.11.X)。
要编写 Spark 应用程序,您需要添加 Spark 的 Maven 依赖项。Spark 可通过 Maven Central 获取:
groupId = org.apache.spark
artifactId = spark-core_2.11
version = 2.3.0
此外,如果您希望访问 HDFS 集群,则需要 hadoop-client为您的 HDFS 版本添加依赖项。
groupId = org.apache.hadoop
artifactId = hadoop-client
version = <your-hdfs-version>
最后,您需要将一些 Spark 类导入到您的程序中。添加以下行:
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
(在 Spark 1.3.0 之前,您需要显式import org.apache.spark.SparkContext._启用必要的隐式转换。)
要编写 Spark 应用程序,您需要添加 Spark 的 Maven 依赖项。Spark 可通过 Maven Central 获取:
groupId = org.apache.spark
artifactId = spark-core_2.11
version = 2.3.0
此外,如果您希望访问 HDFS 集群,则需要 hadoop-client为您的 HDFS 版本添加依赖项。
groupId = org.apache.hadoop
artifactId = hadoop-client
version = <your-hdfs-version>
最后,您需要将一些 Spark 类导入到您的程序中。添加以下行:
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
(在 Spark 1.3.0 之前,您需要显式import org.apache.spark.SparkContext._启用必要的隐式转换。)
初始化 Spark
Spark 程序必须做的第一件事是创建一个SparkContext对象,它告诉 Spark 如何访问集群。要创建 SparkConf,SparkContext您首先需要构建一个包含有关您的应用程序的信息的SparkConf对象。
每个 JVM 只能有一个 SparkContext 处于活动状态。stop()在创建新 SparkContext 之前,您必须先激活 SparkContext。
val conf = new SparkConf().setAppName(appName).setMaster(master)
new SparkContext(conf)
该appName参数是您的应用程序在集群 UI 上显示的名称。 master是Spark、Mesos 或 YARN 集群 URL,或在本地模式下运行的特殊“本地”字符串。实际上,当在集群上运行时,您不会希望master在程序中进行硬编码,而是希望启动应用程序并spark-submit在那里接收它。但是,对于本地测试和单元测试,您可以通过“local”在进程内运行 Spark。
每个 JVM 只能有一个 SparkContext 处于活动状态。stop()在创建新 SparkContext 之前,您必须先激活 SparkContext。
val conf = new SparkConf().setAppName(appName).setMaster(master)
new SparkContext(conf)
该appName参数是您的应用程序在集群 UI 上显示的名称。 master是Spark、Mesos 或 YARN 集群 URL,或在本地模式下运行的特殊“本地”字符串。实际上,当在集群上运行时,您不会希望master在程序中进行硬编码,而是希望启动应用程序并spark-submit在那里接收它。但是,对于本地测试和单元测试,您可以通过“local”在进程内运行 Spark。
使用Shell
在 Spark shell 中,已经为您创建了一个特殊的解释器感知 SparkContext,位于名为 的变量中sc。制作自己的 SparkContext 是行不通的。您可以使用参数设置上下文连接到哪个主机--master,并且可以通过将逗号分隔的列表传递给参数来将 JAR 添加到类路径--jars。您还可以通过向参数提供以逗号分隔的 Maven 坐标列表来将依赖项(例如 Spark 包)添加到 shell 会话中--packages。可能存在依赖关系的任何其他存储库(例如 Sonatype)都可以传递给参数--repositories。例如,要bin/spark-shell在四个核心上运行,请使用:
$ ./bin/spark-shell --master local[4]
或者,要添加code.jar到其类路径中,请使用:
$ ./bin/spark-shell --master local[4] --jars code.jar
要使用 Maven 坐标包含依赖项:
$ ./bin/spark-shell --master local[4] --packages "org.example:example:0.1"
要获得完整的选项列表,请运行spark-shell --help。在幕后, spark-shell调用更通用的spark-submit脚本。
$ ./bin/spark-shell --master local[4]
或者,要添加code.jar到其类路径中,请使用:
$ ./bin/spark-shell --master local[4] --jars code.jar
要使用 Maven 坐标包含依赖项:
$ ./bin/spark-shell --master local[4] --packages "org.example:example:0.1"
要获得完整的选项列表,请运行spark-shell --help。在幕后, spark-shell调用更通用的spark-submit脚本。
弹性分布式数据集 (RDD)
Spark 围绕弹性分布式数据集(RDD)的概念,它是可以并行操作的容错元素集合。创建 RDD 有两种方法:并行 化驱动程序中的现有集合,或引用外部存储系统中的数据集,例如共享文件系统、HDFS、HBase 或任何提供 Hadoop InputFormat 的数据源。
并行集合
并行集合是通过在驱动程序(Scala)中的现有集合上调用 的方法SparkContext来创建的。集合的元素被复制以形成可以并行操作的分布式数据集。例如,以下是如何创建包含数字 1 到 5 的并行集合:parallelizeSeq
val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)
一旦创建,分布式数据集(distData)就可以并行操作。例如,我们可能会调用distData.reduce((a, b) => a + b)将数组的元素相加。稍后我们将描述分布式数据集上的操作。
并行集合的一个重要参数是将数据集划分为的分区数量。Spark 将为集群的每个分区运行一个任务。通常,您希望集群中的每个 CPU 有 2-4 个分区。通常,Spark 会尝试根据您的集群自动设置分区数量。但是,您也可以通过将其作为第二个参数传递给parallelize(例如)来手动设置它sc.parallelize(data, 10)。注意:代码中的某些地方使用术语“切片”(分区的同义词)来保持向后兼容性。
val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)
一旦创建,分布式数据集(distData)就可以并行操作。例如,我们可能会调用distData.reduce((a, b) => a + b)将数组的元素相加。稍后我们将描述分布式数据集上的操作。
并行集合的一个重要参数是将数据集划分为的分区数量。Spark 将为集群的每个分区运行一个任务。通常,您希望集群中的每个 CPU 有 2-4 个分区。通常,Spark 会尝试根据您的集群自动设置分区数量。但是,您也可以通过将其作为第二个参数传递给parallelize(例如)来手动设置它sc.parallelize(data, 10)。注意:代码中的某些地方使用术语“切片”(分区的同义词)来保持向后兼容性。
外部Datasets
Spark 可以从 Hadoop 支持的任何存储源创建分布式数据集,包括本地文件系统、HDFS、Cassandra、HBase、Amazon S3等。Spark 支持TexfFiles、SequenceFiles和任何其他 Hadoop InputFormat。
文本文件 RDD 可以使用SparkContext的textFile方法创建。此方法采用文件的 URI(计算机上的本地路径,或 hdfs://和s3a://URI等)并将其作为行集合读取。下面是一个调用示例:
scala> val distFile = sc.textFile("data.txt")
distFile: org.apache.spark.rdd.RDD[String] = data.txt MapPartitionsRDD[10] at textFile at <console>:26
创建后,distFile可以通过数据集操作进行操作。例如,我们可以使用map和reduce运算将所有线的大小相加,如下所示:distFile.map(s => s.length).reduce((a, b) => a + b)。
Spark读取文件的一些注意事项:
如果使用本地文件系统上的路径,则该文件还必须可以在工作节点上的同一路径上访问。将文件复制到所有Worker节点或使用网络安装的共享文件系统。
Spark 的所有基于文件的输入方法(包括textFile )都支持在目录、压缩文件和通配符上运行。例如,您可以使用textFile("/my/directory")、textFile("/my/directory/*.txt")和textFile("/my/directory/*.gz")。
该textFile方法还采用可选的第二个参数来控制文件的分区数。默认情况下,Spark 为文件的每个块创建一个分区(HDFS 中默认块为 128MB),但您也可以通过传递更大的值来请求更多数量的分区。请注意,分区不能少于块。
SparkContext.wholeTextFiles允许您读取包含多个小文本文件的目录,并将每个文件作为(文件名,内容)对返回。这与 textFile形成鲜明对比,后者将在每个文件中每行返回一条记录。分区由数据局部性决定,在某些情况下,这可能会导致分区过少。对于这些情况,wholeTextFiles提供可选的第二个参数来控制分区的最小数量。
对于SequenceFiles,使用 SparkContext 的sequenceFile[K, V]方法,其中K和V是文件中键和值的类型。这些应该是 Hadoop 的Writable接口的子类,例如IntWritable和Text。此外,Spark 允许您为一些常见的 Writable 指定本机类型;例如,sequenceFile[Int, String]会自动读取IntWritables和Texts。
对于其他 Hadoop 输入格式,您可以使用该SparkContext.hadoopRDD方法,该方法采用任意JobConf输入格式类、键类和值类。设置这些内容的方式与使用输入源设置 Hadoop 作业的方式相同。您还可以使用SparkContext.newAPIHadoopRDD基于“新”MapReduce API ( org.apache.hadoop.mapreduce) 的输入格式。
RDD.saveAsObjectFile并SparkContext.objectFile支持以由序列化 Java 对象组成的简单格式保存 RDD。虽然这不如 Avro 等专用格式高效,但它提供了一种保存任何 RDD 的简单方法。
文本文件 RDD 可以使用SparkContext的textFile方法创建。此方法采用文件的 URI(计算机上的本地路径,或 hdfs://和s3a://URI等)并将其作为行集合读取。下面是一个调用示例:
scala> val distFile = sc.textFile("data.txt")
distFile: org.apache.spark.rdd.RDD[String] = data.txt MapPartitionsRDD[10] at textFile at <console>:26
创建后,distFile可以通过数据集操作进行操作。例如,我们可以使用map和reduce运算将所有线的大小相加,如下所示:distFile.map(s => s.length).reduce((a, b) => a + b)。
Spark读取文件的一些注意事项:
如果使用本地文件系统上的路径,则该文件还必须可以在工作节点上的同一路径上访问。将文件复制到所有Worker节点或使用网络安装的共享文件系统。
Spark 的所有基于文件的输入方法(包括textFile )都支持在目录、压缩文件和通配符上运行。例如,您可以使用textFile("/my/directory")、textFile("/my/directory/*.txt")和textFile("/my/directory/*.gz")。
该textFile方法还采用可选的第二个参数来控制文件的分区数。默认情况下,Spark 为文件的每个块创建一个分区(HDFS 中默认块为 128MB),但您也可以通过传递更大的值来请求更多数量的分区。请注意,分区不能少于块。
SparkContext.wholeTextFiles允许您读取包含多个小文本文件的目录,并将每个文件作为(文件名,内容)对返回。这与 textFile形成鲜明对比,后者将在每个文件中每行返回一条记录。分区由数据局部性决定,在某些情况下,这可能会导致分区过少。对于这些情况,wholeTextFiles提供可选的第二个参数来控制分区的最小数量。
对于SequenceFiles,使用 SparkContext 的sequenceFile[K, V]方法,其中K和V是文件中键和值的类型。这些应该是 Hadoop 的Writable接口的子类,例如IntWritable和Text。此外,Spark 允许您为一些常见的 Writable 指定本机类型;例如,sequenceFile[Int, String]会自动读取IntWritables和Texts。
对于其他 Hadoop 输入格式,您可以使用该SparkContext.hadoopRDD方法,该方法采用任意JobConf输入格式类、键类和值类。设置这些内容的方式与使用输入源设置 Hadoop 作业的方式相同。您还可以使用SparkContext.newAPIHadoopRDD基于“新”MapReduce API ( org.apache.hadoop.mapreduce) 的输入格式。
RDD.saveAsObjectFile并SparkContext.objectFile支持以由序列化 Java 对象组成的简单格式保存 RDD。虽然这不如 Avro 等专用格式高效,但它提供了一种保存任何 RDD 的简单方法。
RDD操作
RDD 支持两种类型的操作:转换(从现有数据集创建新数据集)和操作(在对数据集运行计算后将值返回给驱动程序)。例如,map是一种转换,它将每个数据集元素传递给函数并返回表示结果的新 RDD。另一方面,reduce是使用某个函数聚合 RDD 的所有元素并将最终结果返回给驱动程序的操作(尽管也有reduceByKey返回分布式数据集的并行操作)。
Spark 中的所有转换都是惰性的,因为它们不会立即计算结果。相反,他们只记住应用于某些基础数据集(例如文件)的转换。仅当操作需要将结果返回到驱动程序时才计算转换。这样的设计使得Spark能够更高效地运行。例如,我们可以意识到通过创建的数据集map将在 a 中使用reduce,并且仅将 a 的结果返回reduce给驱动程序,而不是更大的映射数据集。
默认情况下,每次对每个转换后的 RDD 运行操作时,都可能会重新计算它。但是,您也可以使用persistcache(或Cache) 方法将RDD保留在内存中,在这种情况下,Spark 会将元素保留在集群上,以便下次查询时更快地访问。还支持将 RDD 持久保存在磁盘上或跨多个节点复制。
Spark 中的所有转换都是惰性的,因为它们不会立即计算结果。相反,他们只记住应用于某些基础数据集(例如文件)的转换。仅当操作需要将结果返回到驱动程序时才计算转换。这样的设计使得Spark能够更高效地运行。例如,我们可以意识到通过创建的数据集map将在 a 中使用reduce,并且仅将 a 的结果返回reduce给驱动程序,而不是更大的映射数据集。
默认情况下,每次对每个转换后的 RDD 运行操作时,都可能会重新计算它。但是,您也可以使用persistcache(或Cache) 方法将RDD保留在内存中,在这种情况下,Spark 会将元素保留在集群上,以便下次查询时更快地访问。还支持将 RDD 持久保存在磁盘上或跨多个节点复制。
Basics
为了说明 RDD 基础知识,请考虑下面的简单程序:
val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)
第一行定义来自外部文件的基本 RDD。该数据集未加载到内存中或以其他方式执行:lines只是指向文件的指针。第二行定义lineLengths为转换的结果map。同样,由于懒惰,不会立即lineLengths 计算。最后,我们运行reduce,这是一个动作。此时,Spark 将计算分解为在单独的机器上运行的任务,每台机器都运行其映射部分和本地缩减,仅将其答案返回给驱动程序。
如果我们以后还想lineLengths再次使用,我们可以添加:
lineLengths.persist()
在 之前reduce,这将导致lineLengths在第一次计算后保存在内存中。
val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)
第一行定义来自外部文件的基本 RDD。该数据集未加载到内存中或以其他方式执行:lines只是指向文件的指针。第二行定义lineLengths为转换的结果map。同样,由于懒惰,不会立即lineLengths 计算。最后,我们运行reduce,这是一个动作。此时,Spark 将计算分解为在单独的机器上运行的任务,每台机器都运行其映射部分和本地缩减,仅将其答案返回给驱动程序。
如果我们以后还想lineLengths再次使用,我们可以添加:
lineLengths.persist()
在 之前reduce,这将导致lineLengths在第一次计算后保存在内存中。
给Spark 传递函数
Spark的API很大程度上依赖于驱动程序中传递函数来在集群上运行。有两种推荐的方法可以做到这一点:
匿名函数语法,可用于短代码。
全局单例对象中的静态方法。例如,您可以定义object MyFunctions然后传递MyFunctions.func1,如下所示:
object MyFunctions {
def func1(s: String): String = { ... }
}
myRdd.map(MyFunctions.func1)
请注意,虽然也可以传递对类实例中方法的引用(而不是单例对象),但这需要将包含该类的对象与方法一起发送。例如,考虑:
class MyClass {
def func1(s: String): String = { ... }
def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
}
这里,如果我们创建一个新MyClass实例并调用doStuff它,map里面会引用 该实例的func1方法,因此需要将整个对象发送到集群。这与写作类似。MyClassrdd.map(x => this.func1(x))
以类似的方式,访问外部对象的字段将引用整个对象:
class MyClass {
val field = "Hello"
def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) }
}
相当于writing rdd.map(x => this.field + x),它引用了所有的this. 为了避免这个问题,最简单的方法是复制field到局部变量中,而不是从外部访问它:
def doStuff(rdd: RDD[String]): RDD[String] = {
val field_ = this.field
rdd.map(x => field_ + x)
}
匿名函数语法,可用于短代码。
全局单例对象中的静态方法。例如,您可以定义object MyFunctions然后传递MyFunctions.func1,如下所示:
object MyFunctions {
def func1(s: String): String = { ... }
}
myRdd.map(MyFunctions.func1)
请注意,虽然也可以传递对类实例中方法的引用(而不是单例对象),但这需要将包含该类的对象与方法一起发送。例如,考虑:
class MyClass {
def func1(s: String): String = { ... }
def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
}
这里,如果我们创建一个新MyClass实例并调用doStuff它,map里面会引用 该实例的func1方法,因此需要将整个对象发送到集群。这与写作类似。MyClassrdd.map(x => this.func1(x))
以类似的方式,访问外部对象的字段将引用整个对象:
class MyClass {
val field = "Hello"
def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) }
}
相当于writing rdd.map(x => this.field + x),它引用了所有的this. 为了避免这个问题,最简单的方法是复制field到局部变量中,而不是从外部访问它:
def doStuff(rdd: RDD[String]): RDD[String] = {
val field_ = this.field
rdd.map(x => field_ + x)
}
了解闭包
Spark 的难点之一是理解跨集群执行代码时变量和方法的范围和生命周期。修改超出其范围的变量的 RDD 操作可能是造成混乱的常见原因。在下面的示例中,我们将查看用于foreach()递增计数器的代码,但其他操作也可能会出现类似的问题。
例子
考虑下面的朴素 RDD 元素总和,它的行为可能会有所不同,具体取决于执行是否发生在同一 JVM 中。一个常见的示例是在local模式 ( --master = local[n]) 下运行 Spark 与将 Spark 应用程序部署到集群(例如,通过 Spark-submit 到 YARN):
var counter = 0
var rdd = sc.parallelize(data)
// Wrong: Don't do this!!
rdd.foreach(x => counter += x)
println("Counter value: " + counter)
例子
考虑下面的朴素 RDD 元素总和,它的行为可能会有所不同,具体取决于执行是否发生在同一 JVM 中。一个常见的示例是在local模式 ( --master = local[n]) 下运行 Spark 与将 Spark 应用程序部署到集群(例如,通过 Spark-submit 到 YARN):
var counter = 0
var rdd = sc.parallelize(data)
// Wrong: Don't do this!!
rdd.foreach(x => counter += x)
println("Counter value: " + counter)
本地模式与集群模式
上述代码的行为未定义,可能无法按预期工作。为了执行作业,Spark 将 RDD 操作的处理分解为任务,每个任务都由执行器执行。在执行之前,Spark 计算任务的闭包。闭包是执行器在 RDD 上执行计算时必须可见的变量和方法(在本例中foreach())。该闭包被序列化并发送到每个执行器。
发送到每个执行器的闭包中的变量现在是副本,因此,当在函数内引用计数器foreach时,它不再是驱动程序节点上的计数器。驱动程序节点的内存中仍然有一个计数器,但执行程序不再可见!执行者只能看到序列化闭包中的副本。因此,计数器的最终值仍然为零,因为计数器上的所有操作都引用序列化闭包内的值。
在本地模式下,在某些情况下,该foreach函数实际上将在与驱动程序相同的 JVM 中执行,并将引用相同的原始计数器,并且实际上可能会更新它。
为了确保在这些场景中定义良好的行为,应该使用Accumulator. Spark 中的累加器专门用于提供一种机制,用于在集群中的工作节点之间分割执行时安全地更新变量。本指南的累加器部分更详细地讨论了这些内容。
一般来说,闭包——像循环或本地定义的方法这样的结构,不应该被用来改变一些全局状态。Spark 不定义或保证从闭包外部引用的对象的突变行为。执行此操作的某些代码可能在本地模式下工作,但这只是偶然的,并且此类代码在分布式模式下不会按预期运行。如果需要一些全局聚合,请改用累加器。
发送到每个执行器的闭包中的变量现在是副本,因此,当在函数内引用计数器foreach时,它不再是驱动程序节点上的计数器。驱动程序节点的内存中仍然有一个计数器,但执行程序不再可见!执行者只能看到序列化闭包中的副本。因此,计数器的最终值仍然为零,因为计数器上的所有操作都引用序列化闭包内的值。
在本地模式下,在某些情况下,该foreach函数实际上将在与驱动程序相同的 JVM 中执行,并将引用相同的原始计数器,并且实际上可能会更新它。
为了确保在这些场景中定义良好的行为,应该使用Accumulator. Spark 中的累加器专门用于提供一种机制,用于在集群中的工作节点之间分割执行时安全地更新变量。本指南的累加器部分更详细地讨论了这些内容。
一般来说,闭包——像循环或本地定义的方法这样的结构,不应该被用来改变一些全局状态。Spark 不定义或保证从闭包外部引用的对象的突变行为。执行此操作的某些代码可能在本地模式下工作,但这只是偶然的,并且此类代码在分布式模式下不会按预期运行。如果需要一些全局聚合,请改用累加器。
打印Rdd元素
dd.foreach(println)另一个常见的习惯用法是尝试使用或打印出 RDD 的元素rdd.map(println)。在一台机器上,这将生成预期的输出并打印 RDD 的所有元素。但是,在cluster模式下,stdout执行程序调用的输出现在写入执行程序stdout,而不是驱动程序上的输出,因此stdout驱动程序不会显示这些!要打印驱动程序上的所有元素,可以使用以下collect()方法首先将 RDD 带到驱动程序节点:rdd.collect().foreach(println)。不过,这可能会导致驱动程序内存不足,因为collect()将整个 RDD 提取到一台机器上;如果您只需要打印 RDD 的几个元素,更安全的方法是使用take(): rdd.take(100).foreach(println)。
使用键值对
虽然大多数 Spark 操作适用于包含任何类型对象的 RDD,但少数特殊操作仅适用于键值对的 RDD。最常见的是分布式“洗牌”操作,例如通过键对元素进行分组或聚合。
在 Scala 中,这些操作在包含Tuple2对象(语言中的内置元组,通过简单编写 来创建)的 RDD 上自动可用 (a, b)。键值对操作在 PairRDDFunctions类中可用,它自动包装元组的 RDD。
例如,以下代码使用reduceByKey键值对的操作来统计文件中每行文本出现的次数:
val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)
counts.sortByKey()例如, 我们还可以使用按字母顺序对这些对进行排序,最后counts.collect()将它们作为对象数组返回到驱动程序。
注意:当使用自定义对象作为键值对操作的键时,必须确保自定义equals()方法带有匹配的hashCode()方法。有关完整详细信息,请参阅Object.hashCode() 文档中概述的合同。
在 Scala 中,这些操作在包含Tuple2对象(语言中的内置元组,通过简单编写 来创建)的 RDD 上自动可用 (a, b)。键值对操作在 PairRDDFunctions类中可用,它自动包装元组的 RDD。
例如,以下代码使用reduceByKey键值对的操作来统计文件中每行文本出现的次数:
val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)
counts.sortByKey()例如, 我们还可以使用按字母顺序对这些对进行排序,最后counts.collect()将它们作为对象数组返回到驱动程序。
注意:当使用自定义对象作为键值对操作的键时,必须确保自定义equals()方法带有匹配的hashCode()方法。有关完整详细信息,请参阅Object.hashCode() 文档中概述的合同。
Transformations
下表列出了 Spark 支持的一些常见转换。有关详细信息,请参阅 RDD API 文档(Scala、 Java、 Python、 R)和配对 RDD 函数文档(Scala、 Java )。
map(func)
回一个新的分布式数据集,该数据集通过将源的每个元素传递给函数func来形成。【一对一】
filter(func)
返回一个新的数据集,该数据集是通过选择func返回 true的源元素而形成的。
flatMap(func)
与map类似,但每个输入项可以映射到0个或多个输出项(因此func应该返回一个Seq而不是单个项)。
mapPartitions(func)
与map类似,但在RDD的每个分区(块)上单独运行,因此当在T类型的RDD上运行时, func必须是 Iterator<T> => Iterator<U>类型。
mapPartitionsWithIndex(func)
与mapPartitions类似,但也为func提供了一个表示分区索引的整数值,因此当在T类型的RDD上运行时, func必须是(Int, Iterator<T>) => Iterator<U>类型。
sample(withReplacement, fraction, seed)
当调用 (K, V) 对数据集时,返回 (K, Iterable<V>) 对数据集。
注意:如果您要进行分组以便对每个键执行聚合(例如求和或求平均值),则使用reduceByKey或aggregateByKey会产生更好的性能。
注意:默认情况下,输出的并行度取决于父 RDD 的分区数量。您可以传递可选numPartitions参数来设置不同数量的任务。
注意:如果您要进行分组以便对每个键执行聚合(例如求和或求平均值),则使用reduceByKey或aggregateByKey会产生更好的性能。
注意:默认情况下,输出的并行度取决于父 RDD 的分区数量。您可以传递可选numPartitions参数来设置不同数量的任务。
union(otherDataset)
返回一个新数据集,其中包含源数据集中元素和参数的并集。
intersection(otherDataset)
回一个新的 RDD,其中包含源数据集中元素与参数的交集。
distinct([numPartitions]))
返回一个新数据集,其中包含源数据集的不同元素。
groupByKey([numPartitions])
当调用 (K, V) 对数据集时,返回 (K, Iterable<V>) 对数据集。
注意:如果您要进行分组以便对每个键执行聚合(例如求和或求平均值),则使用reduceByKey或aggregateByKey会产生更好的性能。
注意:默认情况下,输出的并行度取决于父 RDD 的分区数量。您可以传递可选numPartitions参数来设置不同数量的任务。
注意:如果您要进行分组以便对每个键执行聚合(例如求和或求平均值),则使用reduceByKey或aggregateByKey会产生更好的性能。
注意:默认情况下,输出的并行度取决于父 RDD 的分区数量。您可以传递可选numPartitions参数来设置不同数量的任务。
reduceByKey(func, [numPartitions])
当调用 (K, V) 对数据集时,返回 (K, V) 对数据集,其中每个键的值使用给定的reduce 函数func进行聚合,该函数的类型必须为 (V,V) => V. 与 中一样groupByKey,reduce 任务的数量可以通过可选的第二个参数进行配置。
aggregateByKey(zeroValue)(seqOp, combOp, [numPartitions])
当调用 (K, V) 对数据集时,返回 (K, U) 对数据集,其中每个键的值使用给定的组合函数和中性“零”值进行聚合。允许使用与输入值类型不同的聚合值类型,同时避免不必要的分配。与 中一样groupByKey,reduce 任务的数量可以通过可选的第二个参数进行配置。
sortByKey([ascending], [numPartitions])
When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument.
join
当调用 (K, V) 和 (K, W) 类型的数据集时,返回 (K, (V, W)) 对的数据集,(V, W)是每个键的所有元素对。支持的外连接的类型有:leftOuterJoin通过、rightOuterJoin和ullOuterJoin。
cogroup
当调用 (K, V) 和 (K, W) 类型的数据集时,返回 (K, (Iterable<V>, Iterable<W>)) 元组的数据集。此操作也称为groupWith.
cartesian
当调用 T 和 U 类型的数据集时,返回 (T, U) 对(所有元素对)的数据集。
pipe
通过 shell 命令(例如 Perl 或 bash 脚本)对 RDD 的每个分区进行管道传输。RDD 元素被写入进程的 stdin,并且输出到其 stdout 的行作为字符串 RDD 返回。
coalesce
将 RDD 中的分区数量减少到 numPartitions。对于过滤大型数据集后更有效地运行操作很有用。
parttitions
随机重新整理 RDD 中的数据以创建更多或更少的分区并在它们之间进行平衡。这总是会打乱网络上的所有数据。
repartitionAndSortWithinPartitions
根据给定的分区程序对 RDD 进行重新分区,并在每个结果分区中按记录的键对记录进行排序。这比repartition在每个分区内调用然后排序更有效,因为它可以将排序下推到洗牌机制中。
Actions
reduce
使用函数func聚合数据集的元素(它接受两个参数并返回一个)。该函数应该是可交换的和关联的,以便可以正确地并行计算。
collect
在驱动程序中将数据集的所有元素作为数组返回。在返回足够小的数据子集的过滤器或其他操作之后,这通常很有用。
count
返回数据集中的元素数量。
first
返回数据集的第一个元素(类似于 take(1))。
take(n)
返回包含数据集前n 个元素的数组。
takeSample(withReplacement, num, [seed])
返回一个数组,其中包含数据集num 个元素的随机样本,有或没有替换,可以选择预先指定随机数生成器种子。
takeOrdered(n, [ordering])
使用自然顺序或自定义比较器返回 RDD 的前n 个元素。
saveAsTextFile(path)
将数据集的元素作为 Hadoop SequenceFile 写入本地文件系统、HDFS 或任何其他 Hadoop 支持的文件系统的给定路径中。这在实现 Hadoop 的 Writable 接口的键值对 RDD 上可用。在 Scala 中,它也适用于可隐式转换为 Writable 的类型(Spark 包括对 Int、Double、String 等基本类型的转换)。
saveAsSequenceFile(path)(Java and Scala)
将数据集的元素作为 Hadoop SequenceFile 写入本地文件系统、HDFS 或任何其他 Hadoop 支持的文件系统的给定路径中。这在实现 Hadoop 的 Writable 接口的键值对 RDD 上可用。在 Scala 中,它也适用于可隐式转换为 Writable 的类型(Spark 包括对 Int、Double、String 等基本类型的转换)。
saveAsObjectFile(path)(Java and Scala)
使用 Java 序列化以简单格式写入数据集的元素,然后可以使用 SparkContext.objectFile().
countByKey
仅适用于 (K, V) 类型的 RDD。返回 (K, Int) 对的哈希图以及每个键的计数。
foreach
对数据集的每个元素运行函数func 。这样做通常是为了防止副作用,例如更新累加器或与外部存储系统交互。
注意:修改累加器之外的变量foreach()可能会导致未定义的行为。有关更多详细信息,请参阅了解闭包。
注意:修改累加器之外的变量foreach()可能会导致未定义的行为。有关更多详细信息,请参阅了解闭包。
Spark RDD API 还公开了某些操作的异步版本,例如foreachAsyncfor foreach,它立即将 a 返回FutureAction给调用者,而不是在操作完成时阻塞。这可用于管理或等待操作的异步执行。
Shuffle Operations
Spark 中的某些操作会触发称为 shuffle 的事件。shuffle 是 Spark 重新分配数据的机制,以便数据在分区之间以不同的方式分组。这通常涉及跨Executors和machines复制数据,从而使Shuffle成为复杂且成本高昂的操作。
背景
为了了解Shuffle期间发生的情况,我们可以考虑该 reduceByKey操作的示例。reduceByKey操作会生成一个新的 RDD,其中单个键的所有值都组合成一个元组 (键以及针对与该键关联的所有值执行reduce函数的结果)。难点在于,单个键的所有值不一定都位于同一分区,甚至同一台机器上,但它们必须位于同一位置才能计算结果。
在 Spark 中,数据通常不会跨分区分布到特定操作的必要位置。在计算过程中,单个任务将在单个分区上运行 - 因此,为了组织单个reduce任务执行educeByKey的所有数据r,Spark需要执行all-to-all操作。它必须从所有分区中读取以查找所有键的所有值,然后将跨分区的值汇集在一起以计算每个键的最终结果 - 这称为shuffle。
虽然新打乱的数据的每个分区中的元素集是确定性的,分区本身的顺序也是确定性的,但这些元素的顺序却不是。如果需要在洗牌后得到可预测的有序数据,那么可以使用:
mapPartitions对每个分区进行排序,例如,.sorted
repartitionAndSortWithinPartitions有效地对分区进行排序,同时重新分区
sortBy制作一个全局有序的 RDD
可能会引起Suffffle的操作有, 重新repartitioin的操作,比如: repartition 和coalesce; ByKey 的操作(除了Counting的操作),比如groupByKey 和reduceByKey; join 的操作比如cogroup 和join.
在 Spark 中,数据通常不会跨分区分布到特定操作的必要位置。在计算过程中,单个任务将在单个分区上运行 - 因此,为了组织单个reduce任务执行educeByKey的所有数据r,Spark需要执行all-to-all操作。它必须从所有分区中读取以查找所有键的所有值,然后将跨分区的值汇集在一起以计算每个键的最终结果 - 这称为shuffle。
虽然新打乱的数据的每个分区中的元素集是确定性的,分区本身的顺序也是确定性的,但这些元素的顺序却不是。如果需要在洗牌后得到可预测的有序数据,那么可以使用:
mapPartitions对每个分区进行排序,例如,.sorted
repartitionAndSortWithinPartitions有效地对分区进行排序,同时重新分区
sortBy制作一个全局有序的 RDD
可能会引起Suffffle的操作有, 重新repartitioin的操作,比如: repartition 和coalesce; ByKey 的操作(除了Counting的操作),比如groupByKey 和reduceByKey; join 的操作比如cogroup 和join.
性能影响
Shuffle 是个代价很大的操作,因为它涉及磁盘I/O, 数据序列化,以及网络I/O. 为了组织Shuffle 的数据,Spark生成一系列的Task, 包括用于转换数据的map tasks 和永宇聚集数据的reduce tasks。 该术语来自 MapReduce,但与 Spark 的map和reduce操作没有直接关系。
在内部,各个map任务的结果都会保存在内存中,直到它们无法容纳为止。然后,根据目标分区对它们进行排序并写入单个文件。在reduce端,任务读取相关的排序块。
某些Shuffle操作可能会消耗大量堆内存,因为它们在传输之前或之后使用内存中数据结构来组织记录。具体来说,reduceByKey and aggregateByKey 在map端创建这些结构,然后ByKey操作在reduce端生成这些结构。当内存无法容纳数据时,Spark 会将这些表溢出到磁盘,从而产生额外的磁盘 I/O 开销并增加垃圾收集。
可以通过调整各种配置参数来调整Shuffle行为。请参阅Spark 配置指南中的“Shuffle行为”部分。
在内部,各个map任务的结果都会保存在内存中,直到它们无法容纳为止。然后,根据目标分区对它们进行排序并写入单个文件。在reduce端,任务读取相关的排序块。
某些Shuffle操作可能会消耗大量堆内存,因为它们在传输之前或之后使用内存中数据结构来组织记录。具体来说,reduceByKey and aggregateByKey 在map端创建这些结构,然后ByKey操作在reduce端生成这些结构。当内存无法容纳数据时,Spark 会将这些表溢出到磁盘,从而产生额外的磁盘 I/O 开销并增加垃圾收集。
可以通过调整各种配置参数来调整Shuffle行为。请参阅Spark 配置指南中的“Shuffle行为”部分。
RDD持久化
Spark 最重要的功能之一是跨操作在内存中持久保存(或缓存)数据集。当您持久化 RDD 时,每个节点都会将其计算的任何分区存储在内存中,并在该数据集(或从其派生的数据集)上的其他操作中重用它们。这使得未来的操作速度更快(通常快 10 倍以上)。缓存是迭代算法和快速交互使用的关键工具。
您可以使用persist()或cache()方法将RDD 标记为要持久化。第一次在操作中计算时,它将保存在节点的内存中。Spark 的缓存是容错的——如果 RDD 的任何分区丢失,它将使用最初创建它的转换自动重新计算。
此外,每个持久化的 RDD 都可以使用不同的存储级别进行存储,例如,允许您将数据集持久化在磁盘上、将其持久化在内存中、但作为序列化的 Java 对象(以节省空间)、跨节点复制。这些级别是通过将 StorageLevel对象(Scala、 Java、 Python)传递给persist(). 该cache()方法是使用默认存储级别的简写,即StorageLevel.MEMORY_ONLY(将反序列化对象存储在内存中)。完整的存储级别集是:
您可以使用persist()或cache()方法将RDD 标记为要持久化。第一次在操作中计算时,它将保存在节点的内存中。Spark 的缓存是容错的——如果 RDD 的任何分区丢失,它将使用最初创建它的转换自动重新计算。
此外,每个持久化的 RDD 都可以使用不同的存储级别进行存储,例如,允许您将数据集持久化在磁盘上、将其持久化在内存中、但作为序列化的 Java 对象(以节省空间)、跨节点复制。这些级别是通过将 StorageLevel对象(Scala、 Java、 Python)传递给persist(). 该cache()方法是使用默认存储级别的简写,即StorageLevel.MEMORY_ONLY(将反序列化对象存储在内存中)。完整的存储级别集是:
存储级别
MEMORY_ONLY: 将 RDD 作为反序列化的 Java 对象存储在 JVM 中。如果 RDD 不适合内存,某些分区将不会被缓存,并且会在每次需要时动态重新计算。这是默认级别。
MEMORY_AND_DISK(Java and Scala): 将 RDD 作为反序列化的 Java 对象存储在 JVM 中。如果 RDD 无法容纳在内存中,请将无法容纳的分区存储在磁盘上,并在需要时从那里读取它们。
MEMORY_AND_DISK_SER(Java and Scala): 将 RDD 存储为序列化Java 对象(每个分区一个字节数组)。这通常比反序列化对象更节省空间,特别是在使用 快速序列化器时,但读取时更需要 CPU 密集型。
DISK_ONLY: 仅将 RDD 分区存储在磁盘上。
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc.: 与上面的级别相同,但在两个集群节点上复制每个分区。
OFF_HEAP (experimental): 与MEMORY_ONLY_SER类似,但将数据存储在 堆外内存中。这需要启用堆外内存。
注意:在Python中,存储的对象将始终使用 Pickle库进行序列化,因此是否选择序列化级别并不重要。Python 中可用的存储级别包括 MEMORY_ONLY, MEMORY_ONLY_2, MEMORY_AND_DISK, MEMORY_AND_DISK_2, DISK_ONLY, and DISK_ONLY_2.
Spark 还会自动在 shuffle 操作中保留一些中间数据(例如reduceByKey),即使用户没有调用persist. 这样做是为了避免在Suffle期间节点失败时重新计算整个输入。如果用户打算重用生成的 RDD,我们仍然建议persist他们调用它。
选择哪个存储级别?
Spark 的存储级别旨在提供内存使用和 CPU 效率之间的不同权衡。我们建议通过以下过程来选择一个:
如果您的 RDD 适合默认存储级别 ( MEMORY_ONLY),请保持这种状态。这是 CPU 效率最高的选项,允许 RDD 上的操作尽可能快地运行。
如果没有,请尝试使用MEMORY_ONLY_SER并选择一个快速序列化库,以使对象更加节省空间,但访问速度仍然相当快。(Java 和 Scala)
不要溢出到磁盘,除非计算数据集的函数很昂贵,或者它们过滤大量数据。否则,重新计算分区可能与从磁盘读取分区一样快。
如果您想要快速故障恢复(例如,如果使用 Spark 来处理来自 Web 应用程序的请求),请使用复制存储级别。所有存储级别都通过重新计算丢失的数据来提供完整的容错能力,但复制存储级别允许您继续在 RDD 上运行任务,而无需等待重新计算丢失的分区。
删除数据
Spark 自动监控每个节点上的缓存使用情况,并以最近最少使用 (LRU) 方式删除旧数据分区。如果您想手动删除 RDD,而不是等待它从缓存中删除,请使用该RDD.unpersist()方法。
共享变量
通常,当传递给 Spark 操作(例如map或reduce)的函数在远程集群节点上执行时,它会作用于该函数中使用的所有变量的单独副本。这些变量被复制到每台机器,并且远程机器上的变量更新不会传播回驱动程序。支持跨任务的通用读写共享变量效率很低。然而,Spark 确实为两种常见的使用模式提供了两种有限类型的共享变量:广播变量和累加器。
广播变量
广播变量允许程序员在每台机器上缓存只读变量,而不是随任务传送它的副本。例如,它们可用于以有效的方式为每个节点提供大型输入数据集的副本。Spark还尝试使用高效的广播算法来分发广播变量,以降低通信成本。
Spark 操作通过一组阶段执行,这些阶段由分布式“shuffle”操作分隔开。Spark自动广播每个阶段内任务所需的公共数据。以这种方式广播的数据以序列化形式缓存,并在运行每个任务之前反序列化。这意味着只有当跨多个阶段的任务需要相同的数据或者以反序列化形式缓存数据很重要时,显式创建广播变量才有用。
广播变量是通过调用从变量v创建的SparkContext.broadcast(v)。广播变量是v的包装器,可以通过调用value 方法来访问其值。下面的代码显示了这一点:
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)
scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)
创建广播变量后,应使用v来代替集群上运行的任何函数中的值,以便v不会多次将其发送到节点。此外,对象 v在广播后不应被修改,以确保所有节点获得广播变量的相同值(例如,如果稍后将该变量传送到新节点)。
Spark 操作通过一组阶段执行,这些阶段由分布式“shuffle”操作分隔开。Spark自动广播每个阶段内任务所需的公共数据。以这种方式广播的数据以序列化形式缓存,并在运行每个任务之前反序列化。这意味着只有当跨多个阶段的任务需要相同的数据或者以反序列化形式缓存数据很重要时,显式创建广播变量才有用。
广播变量是通过调用从变量v创建的SparkContext.broadcast(v)。广播变量是v的包装器,可以通过调用value 方法来访问其值。下面的代码显示了这一点:
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)
scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)
创建广播变量后,应使用v来代替集群上运行的任何函数中的值,以便v不会多次将其发送到节点。此外,对象 v在广播后不应被修改,以确保所有节点获得广播变量的相同值(例如,如果稍后将该变量传送到新节点)。
累加器
累加器是仅通过关联和交换运算“添加”的变量,因此可以有效地支持并行。它们可用于实现计数器(如在 MapReduce 中)或求和。Spark 原生支持数字类型的累加器,程序员可以添加对新类型的支持。
作为用户,您可以创建命名或未命名的累加器。如下图所示,一个命名的累加器(在本例中counter)将显示在修改该累加器的阶段的 Web UI 中。Spark 在“任务”表中显示由任务修改的每个累加器的值。
在 UI 中跟踪累加器对于了解运行阶段的进度很有用(注意:Python 尚不支持)。
作为用户,您可以创建命名或未命名的累加器。如下图所示,一个命名的累加器(在本例中counter)将显示在修改该累加器的阶段的 Web UI 中。Spark 在“任务”表中显示由任务修改的每个累加器的值。
在 UI 中跟踪累加器对于了解运行阶段的进度很有用(注意:Python 尚不支持)。
可以通过调用SparkContext.longAccumulator()或SparkContext.doubleAccumulator() 分别累加 Long 或 Double 类型的值来创建数字累加器。然后可以使用该方法将在集群上运行的任务添加到集群中add。然而,他们无法读取其value。只有驱动程序可以使用累加器的方法读取累加器的值value。
下面的代码显示了一个用于将数组元素相加的累加器:
scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
scala> accum.value
res2: Long = 10
虽然此代码使用了对 Long 类型累加器的内置支持,但程序员还可以通过子类化AccumulatorV2来创建自己的类型。AccumulatorV2 抽象类有几个必须重写的方法:reset将累加器重置为零、add将另一个值添加到累加器中、 merge将另一个相同类型的累加器合并到此累加器中。其他必须重写的方法包含在API 文档中。例如,假设我们有一个MyVector表示数学向量的类,我们可以这样写:
class VectorAccumulatorV2 extends AccumulatorV2[MyVector, MyVector] {
private val myVector: MyVector = MyVector.createZeroVector
def reset(): Unit = {
myVector.reset()
}
def add(v: MyVector): Unit = {
myVector.add(v)
}
}
// Then, create an Accumulator of this type:
val myVectorAcc = new VectorAccumulatorV2
// Then, register it into spark context:
sc.register(myVectorAcc, "MyVectorAcc1")
请注意,当程序员定义自己的 AccumulatorV2 类型时,生成的类型可能与添加的元素的类型不同。
对于仅在操作内部执行的累加器更新,Spark 保证每个任务对累加器的更新只会应用一次,即重新启动的任务不会更新该值。在转换中,用户应该意识到,如果重新执行任务或作业阶段,每个任务的更新可能会应用多次。
累加器不会改变 Spark 的惰性求值模型。如果它们是在 RDD 的操作中更新的,则只有当 RDD 作为操作的一部分进行计算时,它们的值才会更新。因此,在诸如 之类的惰性转换中进行累加器更新时,不能保证执行map()。下面的代码片段演示了这个属性:
val accum = sc.longAccumulator
data.map { x => accum.add(x); x }
// Here, accum is still 0 because no actions have caused the map operation to be computed.
下面的代码显示了一个用于将数组元素相加的累加器:
scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
scala> accum.value
res2: Long = 10
虽然此代码使用了对 Long 类型累加器的内置支持,但程序员还可以通过子类化AccumulatorV2来创建自己的类型。AccumulatorV2 抽象类有几个必须重写的方法:reset将累加器重置为零、add将另一个值添加到累加器中、 merge将另一个相同类型的累加器合并到此累加器中。其他必须重写的方法包含在API 文档中。例如,假设我们有一个MyVector表示数学向量的类,我们可以这样写:
class VectorAccumulatorV2 extends AccumulatorV2[MyVector, MyVector] {
private val myVector: MyVector = MyVector.createZeroVector
def reset(): Unit = {
myVector.reset()
}
def add(v: MyVector): Unit = {
myVector.add(v)
}
}
// Then, create an Accumulator of this type:
val myVectorAcc = new VectorAccumulatorV2
// Then, register it into spark context:
sc.register(myVectorAcc, "MyVectorAcc1")
请注意,当程序员定义自己的 AccumulatorV2 类型时,生成的类型可能与添加的元素的类型不同。
对于仅在操作内部执行的累加器更新,Spark 保证每个任务对累加器的更新只会应用一次,即重新启动的任务不会更新该值。在转换中,用户应该意识到,如果重新执行任务或作业阶段,每个任务的更新可能会应用多次。
累加器不会改变 Spark 的惰性求值模型。如果它们是在 RDD 的操作中更新的,则只有当 RDD 作为操作的一部分进行计算时,它们的值才会更新。因此,在诸如 之类的惰性转换中进行累加器更新时,不能保证执行map()。下面的代码片段演示了这个属性:
val accum = sc.longAccumulator
data.map { x => accum.add(x); x }
// Here, accum is still 0 because no actions have caused the map operation to be computed.
部署到集群
application submission guide介绍了如何向集群提交申请.简而言之,一旦您将应用程序打包到 JAR(对于 Java/Scala)或一组.py或.zip文件(对于 Python)中,该bin/spark-submit脚本就可以让您将其提交给任何支持的集群管理器。
从 Java/Scala 启动 Spark 作业
org.apache.spark.launcher 包提供了使用简单的 Java API 将 Spark 作业作为子进程启动的类。
单元测试
Spark 适合使用任何流行的单元测试框架进行单元测试。只需在您的测试中创建一个,SparkContext并将主 URL 设置为local,运行您的操作,然后调用SparkContext.stop()将其拆除。确保在finally块或测试框架tearDown方法内停止Context,因为 Spark 不支持在同一程序中同时运行两个Context。
其他
您可以在 Spark 网站上查看一些示例 Spark 程序。此外,Spark 在examples目录中包含多个示例(Scala、 Java、 Python、 R)。您可以通过将类名传递给 Spark 的bin/run-example脚本来运行 Java 和 Scala 示例;例如:
./bin/run-example SparkPi
对于 Python 示例,请spark-submit改用:
./bin/spark-submit examples/src/main/python/pi.py
对于 R 示例,请spark-submit改用:
./bin/spark-submit examples/src/main/r/dataframe.R
为了帮助优化您的程序,配置和 调整指南提供了有关最佳实践的信息。它们对于确保您的数据以有效的格式存储在内存中尤其重要。为了帮助部署,集群模式概述描述了分布式操作中涉及的组件和支持的集群管理器。
最后,完整的 API 文档提供了 Scala、Java、Python和R版本。
./bin/run-example SparkPi
对于 Python 示例,请spark-submit改用:
./bin/spark-submit examples/src/main/python/pi.py
对于 R 示例,请spark-submit改用:
./bin/spark-submit examples/src/main/r/dataframe.R
为了帮助优化您的程序,配置和 调整指南提供了有关最佳实践的信息。它们对于确保您的数据以有效的格式存储在内存中尤其重要。为了帮助部署,集群模式概述描述了分布式操作中涉及的组件和支持的集群管理器。
最后,完整的 API 文档提供了 Scala、Java、Python和R版本。
0 条评论
下一页