Apache Spark - Core 编程

Spark Core 是整个项目的基础。 它提供分布式任务分派、调度和基本 I/O 功能。 Spark 使用称为 RDD(弹性分布式数据集)的专用基础数据结构,它是跨机器分区的数据的逻辑集合。 可以通过两种方式创建 RDD; 一是通过引用外部存储系统中的数据集,二是通过在现有 RDD 上应用转换(例如 map、filter、reducer、join)。

RDD 抽象通过语言集成的 API 公开。 这简化了编程复杂性,因为应用程序操作 RDD 的方式类似于操作本地数据集合。


Spark Shell

Spark 提供了一个交互式的 shell − 交互式分析数据的强大工具。 它以 Scala 或 Python 语言提供。 Spark 的主要抽象是称为弹性分布式数据集 (RDD) 的分布式项目集合。 RDD 可以从 Hadoop 输入格式(例如 HDFS 文件)或通过转换其他 RDD 来创建。

打开 Spark Shell

以下命令用于打开 Spark shell。

$ spark-shell

创建简单的 RDD

让我们从文本文件创建一个简单的 RDD。 使用以下命令创建一个简单的 RDD。

scala> val inputfile = sc.textFile(“input.txt”)

上述命令的输出是

inputfile: org.apache.spark.rdd.RDD[String] = input.txt MappedRDD[1] at textFile at <console>:12

Spark RDD API 引入了少量Transformations 和少量Actions 来操作RDD。


RDD 转换

RDD 转换返回指向新 RDD 的指针,并允许您在 RDD 之间创建依赖关系。 依赖链(String of Dependencies)中的每个RDD都有一个计算其数据的函数,并有一个指向其父RDD的指针(依赖)。

Spark 是惰性的,因此除非您调用一些会触发作业创建和执行的转换或操作,否则不会执行任何操作。 请看以下单词计数示例的片段。

因此,RDD 转换不是一组数据,而是程序中的一个步骤(可能是唯一的步骤)告诉 Spark 如何获取数据以及如何处理数据。

下面给出的是 RDD 转换列表。

S.No 转换与描述
1

map(func)

返回一个新的分布式数据集,通过函数 func 传递源的每个元素而形成。

2

filter(func)

返回通过选择 func 返回 true 的源元素形成的新数据集。

3

flatMap(func)

类似于 map,但每个输入项可以映射到 0 个或多个输出项(因此 func 应该返回一个 Seq 而不是单个项)。

4

mapPartitions(func)

与map类似,但在RDD的每个分区(块)上单独运行,所以func在T类型的RDD上运行时必须是 Iterator<T> ⇒ Iterator<U> 类型。

5

mapPartitionsWithIndex(func)

与map Partitions类似,但也为func提供了一个表示分区索引的整数值,因此func在T类型的RDD上运行时必须为 type (Int, Iterator<T>) ⇒ Iterator<U> 。

6

sample(withReplacement, fraction, seed)

使用给定的随机数生成器种子对数据的 fraction 进行采样,无论是否替换。

7

union(otherDataset)

返回一个新数据集,其中包含源数据集中元素和参数的并集。

8

intersection(otherDataset)

返回一个新的 RDD,其中包含源数据集中元素和参数的交集。

9

distinct([numTasks])

返回包含源数据集的不同元素的新数据集。

10

groupByKey([numTasks])

在 (K, V) 对的数据集上调用时,返回 (K, Iterable<V>) 对的数据集。

注意 − 如果您正在分组以便对每个键执行聚合(例如求和或平均),则使用 reduceByKey 或 aggregateByKey 将产生更好的性能。

11

reduceByKey(func, [numTasks])

在 (K, V) 对的数据集上调用时,返回 (K, V) 对的数据集,其中每个键的值使用给定的 reduce 函数 func 聚合,该函数必须是 type (V, V) ⇒ V. 与 groupByKey 一样,reduce 任务的数量可以通过可选的第二个参数进行配置。

12

aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])

当在 (K, V) 对的数据集上调用时,返回 (K, U) 对的数据集,其中每个键的值使用给定的组合函数和中性"零"值聚合。 允许不同于输入值类型的聚合值类型,同时避免不必要的分配。 与 groupByKey 一样,reduce 任务的数量可以通过可选的第二个参数进行配置。

13

sortByKey([ascending], [numTasks])

当在 K 实现 Ordered 的 (K, V) 对数据集上调用时,返回由键按升序或降序排序的 (K, V) 对数据集,如布尔升序参数中指定的那样。

14

join(otherDataset, [numTasks])

当在 (K, V) 和 (K, W) 类型的数据集上调用时,返回 (K, (V, W)) 对的数据集,其中每个键的所有元素对。 通过 leftOuterJoin、rightOuterJoin 和 fullOuterJoin 支持外连接。

15

cogroup(otherDataset, [numTasks])

在 (K, V) 和 (K, W) 类型的数据集上调用时,返回 (K, (Iterable<V>, Iterable<W>)) 元组的数据集。 此操作也称为 group With。

16

cartesian(otherDataset)

在 T 和 U 类型的数据集上调用时,返回 (T, U) 对(所有元素对)的数据集。

17

pipe(command, [envVars])

通过 shell 命令对 RDD 的每个分区进行管道传输,例如 Perl 或 bash 脚本。 RDD 元素被写入进程的标准输入,输出到标准输出的行作为字符串的 RDD 返回。

18

coalesce(numPartitions)

将 RDD 中的分区数减少到 numPartitions。 对于过滤大型数据集后更有效地运行操作很有用。

19

repartition(numPartitions)

随机重新排列 RDD 中的数据以创建更多或更少的分区并在它们之间进行平衡。 这总是对网络上的所有数据进行洗牌。

20

repartitionAndSortWithinPartitions(partitioner)

根据给定的分区器对 RDD 进行重新分区,并在每个生成的分区中,按记录的键对记录进行排序。 这比调用 repartition 然后在每个分区内排序更有效,因为它可以将排序向下推到 shuffle 机器中。


操作

下表给出了返回值的操作列表。

S.No 操作 & 描述
1

reduce(func)

使用函数 func 聚合数据集的元素(它接受两个参数并返回一个)。 该函数应该是可交换的和关联的,以便可以并行正确计算。

2

collect()

在驱动程序中将数据集的所有元素作为数组返回。 这通常在过滤器或其他返回足够小的数据子集的操作之后很有用。

3

count()

返回数据集中元素的数量。

4

first()

返回数据集的第一个元素(类似于 take (1))。

5

take(n)

返回包含数据集前 n 个元素的数组。

6

takeSample (withReplacement,num, [seed])

返回一个数组,其中包含数据集的 num 个元素的随机样本,有或没有替换,可选地预先指定随机数生成器种子。

7

takeOrdered(n, [ordering])

使用自然顺序或自定义比较器返回 RDD 的前 n 个元素。

8

saveAsTextFile(path)

将数据集的元素作为文本文件(或文本文件集)写入本地文件系统、HDFS 或任何其他 Hadoop 支持的文件系统的给定目录中。 Spark 在每个元素上调用 toString 以将其转换为文件中的一行文本。

9

saveAsSequenceFile(path) (Java and Scala)

将数据集的元素作为 Hadoop SequenceFile 写入本地文件系统、HDFS 或任何其他 Hadoop 支持的文件系统中的给定路径中。 这在实现 Hadoop 的 Writable 接口的键值对的 RDD 上可用。 在 Scala 中,它也可用于可隐式转换为 Writable 的类型(Spark 包括基本类型的转换,如 Int、Double、String 等)。

10

saveAsObjectFile(path) (Java and Scala)

使用 Java 序列化以简单格式写入数据集的元素,然后可以使用 SparkContext.objectFile() 加载。

11

countByKey()

仅适用于 (K, V) 类型的 RDD。 返回 (K, Int) 对的哈希图以及每个键的计数。

12

foreach(func)

对数据集的每个元素运行一个函数 func。 这通常是针对诸如更新累加器或与外部存储系统交互等副作用而完成的。

注意 − 在 foreach() 之外修改除累加器以外的变量可能会导致未定义的行为。 有关更多详细信息,请参阅了解闭包。


使用 RDD 编程

让我们通过一个例子来看看RDD编程中少数RDD转换和动作的实现。

示例

考虑一个字数统计示例 − 它计算出现在文档中的每个单词。 将以下文本视为输入,并在主目录中保存为 input.txt 文件。

input.txt − 输入文件。

people are not as beautiful as they look, 
as they walk or as they talk.
they are only as beautiful  as they love, 
as they care as they share.

按照下面给出的过程执行给定的示例。

打开 Spark-Shell

以下命令用于打开 spark shell。 通常,spark 是使用 Scala 构建的。 因此,Spark 程序运行在 Scala 环境中。

$ spark-shell

如果 Spark shell 成功打开,您将找到以下输出。 查看输出"Spark context available as sc"的最后一行,这意味着 Spark 容器会自动创建名为 sc 的 spark 上下文对象。 在开始程序的第一步之前,应该创建 SparkContext 对象。

Spark assembly has been built with Hive, including Datanucleus jars on classpath 
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 
15/06/04 15:25:22 INFO SecurityManager: Changing view acls to: hadoop 
15/06/04 15:25:22 INFO SecurityManager: Changing modify acls to: hadoop 
15/06/04 15:25:22 INFO SecurityManager: SecurityManager: authentication disabled;
   ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop) 
15/06/04 15:25:22 INFO HttpServer: Starting HTTP Server 
15/06/04 15:25:23 INFO Utils: Successfully started service 'HTTP class server' on port 43292. 
Welcome to 
      ____              __ 
     / __/__  ___ _____/ /__ 
    _\ \/ _ \/ _ `/ __/  '_/ 
   /___/ .__/\_,_/_/ /_/\_\   version 1.4.0 
      /_/  
		
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71) 
Type in expressions to have them evaluated. 
Spark context available as sc 
scala>

创建一个 RDD

首先,我们必须使用 Spark-Scala API 读取输入文件并创建一个 RDD。

以下命令用于从给定位置读取文件。 在这里,新的 RDD 以 inputfile 的名称创建。 在 textFile("") 方法中作为参数给出的字符串是输入文件名的绝对路径。 但是,如果只给出文件名,则表示输入文件在当前位置。

scala> val inputfile = sc.textFile("input.txt")

执行字数转换

我们的目标是计算文件中的单词。 创建一个平面地图,将每一行分成单词 (flatMap(line ⇒ line.split(" "))。

接下来,使用映射函数 (map(word ⇒ (word, 1)) 将每个单词读取为值为 '1' (<key, value> = <word,1>) 的键。

最后,通过添加相似键 (reduceByKey(_+_)) 的值来减少这些键。

以下命令用于执行字数统计逻辑。 执行这个之后,你不会发现任何输出,因为这不是一个动作,这是一个转换; 指向一个新的 RDD 或告诉 spark 如何处理给定的数据)

scala> val counts = inputfile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_+_);

当前 RDD

在使用 RDD 时,如果您想了解当前的 RDD,请使用以下命令。 它将向您显示有关当前 RDD 及其调试依赖项的描述。

scala> counts.toDebugString

缓存转换

您可以使用persist() 或cache() 方法将RDD 标记为持久化。 第一次在动作中计算时,它将保存在节点的内存中。 使用以下命令将中间转换存储在内存中。

scala> counts.cache()

应用动作

应用一个动作,比如存储所有的转换,结果到一个文本文件中。 saveAsTextFile(" ") 方法的 String 参数是输出文件夹的绝对路径。 尝试使用以下命令将输出保存在文本文件中。 在以下示例中,"输出"文件夹位于当前位置。

scala> counts.saveAsTextFile("output")

检查输出

打开另一个终端以转到主目录(在另一个终端中执行 spark)。 使用以下命令检查输出目录。

[hadoop@localhost ~]$ cd output/ 
[hadoop@localhost output]$ ls -1 
 
part-00000 
part-00001 
_SUCCESS

以下命令用于查看 Part-00000 文件的输出。

[hadoop@localhost output]$ cat part-00000

输出

(people,1) 
(are,2) 
(not,1) 
(as,8) 
(beautiful,2) 
(they, 7) 
(look,1) 

以下命令用于查看 Part-00001 文件的输出。

[hadoop@localhost output]$ cat part-00001 

输出

(walk, 1) 
(or, 1) 
(talk, 1) 
(only, 1) 
(love, 1) 
(care, 1) 
(share, 1) 

UN 持久化存储

在 UN-persisting 之前,如果您想查看该应用程序使用的存储空间,请在浏览器中使用以下 URL。

http://localhost:4040

您将看到以下屏幕,其中显示了在 Spark shell 上运行的应用程序使用的存储空间。

存储空间

如果要 UN-persist 特定 RDD 的存储空间,请使用以下命令。

Scala> counts.unpersist()

You will see the output as follows −

15/06/27 00:57:33 INFO ShuffledRDD: Removing RDD 9 from persistence list 
15/06/27 00:57:33 INFO BlockManager: Removing RDD 9 
15/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_1 
15/06/27 00:57:33 INFO MemoryStore: Block rdd_9_1 of size 480 dropped from memory (free 280061810) 
15/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_0 
15/06/27 00:57:33 INFO MemoryStore: Block rdd_9_0 of size 296 dropped from memory (free 280062106) 
res7: cou.type = ShuffledRDD[9] at reduceByKey at <console>:14

要验证浏览器中的存储空间,请使用以下 URL。

http://localhost:4040/

您将看到以下屏幕。 它显示了在 Spark shell 上运行的应用程序使用的存储空间。

应用存储空间