Spark学习笔记之入门篇一

概括

每个Spark程序都包括一个主驱动程序和在集群上执行的并行操作。
Spark一个重要的抽象模型是RDD(resilient distributed dataset),中文翻译呢,就是弹性分布式数据集,弹性呢,指的其实是可以在集群中跨节点和分区。RDDs的创建呢,可以通过一个本地文件,或者HDFS,或者一个现有的Scala集合,甚至其他的RDD通过transformation(RDD的一个基本操作之一,后面会讲到)得来。用户也可以把一个RDD持久化到内存中,可以在并行操作中更高效的复用。还有非常重要的一点,RDDs出现节点故障时可以自动恢复。

Spark另一个抽象概念是可以在并行操作中共享变量,默认Spark在不同节点的任务集并行执行一个方法的时候,它会复制每一个变量到每一个任务中,有时变量需要跨任务共享。

Spark支持两种共享变量模式:

  • 广播变量(broadcast variables):
    • 可以缓存值在所有节点的内存中。
  • 累加器(accumulators):
    • 只能通过“added”得到的变量,类似计数器或求和。

安装

这里的安装只说一下Linux和Mac OS X系统下的安装。

OS X

OS X就简单了,直接homebrew无脑安装,什么?你还不知道homebrew? 点我安装

安装scala

1
brew install scala

安装git

因为pgg自带git,所以这一步就不需要啦

安装spark

1
brew install spark

呵呵,so easy?

Linux

Linux安装的话,也一次是上述步骤

安装scala

官网下载最新的版本,解压后放在本地/opt下。
/etc/profile里添加:

1
2
export SCALA_HOME=/opt/scala-2.9.1.final
export PATH=$SCALA_HOME/bin:$PATH

安装git

官网 查看不同版本Linux下git的安装,使用默认配置。

安装spark

下载最新源码:

1
git clone git://github.com/mesos/spark.git

得到目录spark后,进入spark目录,进入conf子目录,将 spark-env.sh-template 重命名为spark-env.sh,并添加以下代码行:

1
export SCALA_HOME=/opt/scala-2.9.1.final

回到spark目录,开始编译,运行

1
$ sbt/sbt update compile

这条命令会联网下载很多jar,然后会对spark进行编译,编译完成会提示success

Spark自带了一些例子程序,在examples/src/main目录,OS X是libexec/examples/src/main,在要运行的话可以在Spark根目录执行bin/run-example <class> [params]口令,例如计算pi值

1
./bin/run-example SparkPi 10

执行下列命令可以在命令行启动交互:

1
./bin/spark-shell

然后启动日志里会有这么一句:

1
SparkUI: Started SparkUI at http://172.18.1.101:4040

在浏览器输入上述地址则可以在浏览器中查看执行的job。

spark-shell启动后可以执行scala命令。

引入

在Java中引入spark的话,可以在pom.xml中加入:

1
2
3
4
5
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.6.0</version>
</dependency>

如果你想在HDFS集群中访问的话,可以添加一个对应版本的hadoop-client的依赖:

1
2
3
4
5
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version><your-hdfs-version></version>
</dependency>

Java类中需要引入的包基本如下所示:

1
2
3
4
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

RDD

RDDs支持两种类型的操作:

  • transformations:
    • 根据一个现有的RDD生成一个新的RDD
  • actions:
    • 计算返回结果

例如,map就是通过对一个数据集中每个元素执行指定的方法返回一个新的RDD作为结果。那reduce呢,就是对数据集中的元素通过指定的方法进行合并然后返回结果给驱动程序。

所有的 transformations 操作都是延迟计算的,就是说不会马上计算出结果,transformations 只有在一个 action 操作需要返回结果的时候才会被计算。
这个设计可以让Spark有非常高的性能,例如我们能实现通过map生成一个RDD,用作于在reduce中计算,然后只把计算结果返回给驱动程序,而不是整个映射的数据集。

默认的,每个转换过的RDD都有可能在执行action的时候被重新计算,所以你也可以调用persist()或者 cache()把RDD持久化到内存中。这样的话下次访问的话将会更快,Spark同样也支持持久化到磁盘,以及跨越节点复制。

示例

sum

下面将通过示例代码逐步讲解Spark如何使用以及如果操作RDD,语言为Java:

1
2
3
4
5
6
SparkConf conf = new SparkConf().setAppName("test").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
List<Integer> list = Arrays.asList(1,2,3,4,5);
JavaRDD<Integer> distData = sc.parallelize(list);
int sum = distData.reduce((a, b) -> a + b);
System.out.println(sum);

上面这段代码实现了求list里所有元素的和。
我们来逐条分析:

  • SparkConf conf = new SparkConf().setAppName("test").setMaster("local");
  • JavaSparkContext sc = new JavaSparkContext(conf);

写一个Spark程序首先要做的是创建一个JavaSparkContext对象,这个对象需要加载SparkConfSparkConf里包含了应用程序的相关信息。
参数test是展示在集群UI上的应用名称。
参数local表示程序运行在本地模式,所以,这个值还阔以是Spark,Mesos或YARN集群的URL,关于集群的配置后续集群篇中会再提到。

  • JavaRDD<Integer> distData = sc.parallelize(list);
    第三行代码new一个list就不用多说了,看第四行,JavaRDD 就是一个Java的RDD对象。
    Java常用的RDD对象有以下三种:
    • JavaRDD: 普通RDD对象,也是最常用的RDD对象,存储字符串,整形等类型数据。
    • JavaPairRDD: K-V的RDD对象,存储Key-Value对象。
    • JavaDoubleRDD: double类型的RDD对象,存储浮点型和长整形对象。

上述第四行代码中一个RDD对象distData通过调用JavaSparkContext’s parallelize()方法由一个现有的集合list创建。
JavaSparkContext创建RDD常用的方法有以下两种其实是我暂时就会这两种

  • public JavaRDD parallelize(List list)
    • 用于通过集合创建RDD。
  • public JavaRDD textFile(String path)
    • 从文件中创建RDD。path参数为文件的绝对路径。

RDD还有一个很重要的参数就是partitions(分片)的数量,Spark在集群上的每个分片运行一个任务,你可以在集群上给每个CPU设置2-4个分片。通常,Spark会基于集群状况自动设置分片数量,当然你也可以通过上述两个方法的第二个参数来传入要设定的分片数量。如:sc.parallelize(list,10)

  • int sum = distData.reduce((a, b) -> a + b);
    reduce()是一个action操作,Spark 把计算分成多个任务(task),并且让它们运行在多个机器上。每台机器都运行自己的reduce 部分。然后仅仅将结果返回。
    上述的方法内部是一个lambda表达式,表示把两个元素相加。
    只有JDK1.8及之上的版本才支持lambda语法,如果较低版本的话可以实现接口org.apache.spark.api.java.function

上述代码运行结果是 15

wordCount

下面再展示一个wordCount的例子:

1
2
3
4
5
6
SparkConf conf = new SparkConf().setAppName("wordCount").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> distFile = sc.textFile("/Users/apple/Documents/spark");
JavaRDD<String> words = distFile.flatMap(s -> Arrays.asList(s.split(" ")));
JavaPairRDD<String,Integer> results = words.mapToPair(word -> new Tuple2<>(word, 1)).reduceByKey((a, b) -> a + b);
results.collection().forEach(tuple -> System.out.println("flatMap" + tuple._1 + ":" + tuple._2));

可以看到通过Spark实现的wordCount,要远远比Hadoop的Map Reduce实现的要简单很多,而且语法和实现都更加清晰易懂。

我们从第三行开始分析:

  • JavaRDD<String> distFile = sc.textFile("/Users/apple/Documents/spark");
    /Users/apple/Documents/spark文件创建了一个RDD对象。

  • JavaRDD<String> words = distFile.flatMap(s -> Arrays.asList(s.split(" ")));
    调用flatMap方法通过执行指定的计算把RDD进行transformations操作。
    这个指定的计算在上述代码里具体指的就是把每一行的字符串以空格切分返回字符串数组。

  • JavaPairRDD<String,Integer> results = words.mapToPair(word -> new Tuple2<>(word, 1)).reduceByKey((a, b) -> a + b);
    这里连续进行了两次操作,显示调用mapToPair方法记录K-V,每个单词出现一次就记一次。
    然后再调用reduceByKey方法把相同K的记录进行合并。

  • results.collection().forEach(tuple -> System.out.println("flatMap" + tuple._1 + ":" + tuple._2));
    rdd.collect().foreach(println)可以把RDD的元素打印出来,但官方文档提到调用collect()可能会造成OOM(out of memory),因为这个方法会把整个集群分布的RDD都取到一个物理机上,如果需要打印一些元素,可以调用rdd.take(100).foreach(println)来看效果

下章将会更深入的学习Spark。