第十周 第3章 spark实战: 单词统计
WordCount程序
1 | 首先看来一个快速入门案例,单词计数 |
1 | 注意:由于Spark支持Java、Scala这些语言,目前在企业中大部分公司都是使用Scala语言进行开发,个别公司会使用java进行开发,为了加深大家对Spark的理解,也满足java老程序员的需求,针对本课程中的案例,我们都会先基于Scala代码进行详细的讲解,然后再使用java代码重新实现一遍 |
Scala代码开发
maven项目的创建
1 | 下面来创建一个maven项目,集成java和scala的sdk |





1 | 最后需要添加Spark的maven依赖 |



1 | <dependency> |
1 | 注意:我们现在引入的Scala SDK是2.12版本的,建议大家再新增一个2.11版本的scala SDK。 |
scala代码
1 | package com.imooc.scala |
1 | 这里master设置的是local表示直接在本地临时创建一个集群(方便调试),就不用打包放到linux上spark集群运行 |
1 | 执行代码,结果如下 |
总结
1 | 我们再来总结一下代码中这几个RDD中的数据结构 |
1 | val wordsRDD = linesRDD.flatMap(_.split(" ")) |
1 | val pairRDD = wordsRDD.map((_,1)) |
1 | val wordCountRDD = pairRDD.reduceByKey(_ + _) |
java代码
1 | package com.imooc.java; |
总结
1 | 在编写一行JAVA语句时,有返回值的方法已经决定了返回对象的类型和泛型类型,我们只需要给这个对象起个名字就行。 |
任务提交
1 | 针对任务的提交有这么几种形式 |
使用idea
1 | 直接在idea中执行,方便在本地环境调试代码 |
使用spark-submit
添加打包所用依赖
1 | 使用spark-submit提交到集群执行,实际工作中会使用这种方式 |
1 | <build> |
1 | 注意:这里面的scala版本信息要使用2.11,因为这个spark安装包中依赖的scala是2.11的。 |
1 | 然后把spark-core依赖的作用域设置为provided(因为这是提交到集群运行,spark已经有了spark核心的jar包) |
修改代码
1 | 修改代码中的输入文件路径信息,因为这个时候无法读取windows中的数据了,把代码修改成动态接收输入文件路径 |
1 | //第一步:创建SparkContext |
1 | 修改WordCountJava |
打包
1 | mvn clean package -DskipTests |

上传,提交任务
1 | # 提交任务格式 |
1 | 注意:为了能够方便使用spark-submit脚本,需要在/etc/profile中配置SPARK_HOME环境变量 |
1 | 由于spark-submit命令后面的参数有点多,所以在这我们最好是写一个脚本去提交任务 |
1 | 在提交任务之前还需要先把hadoop集群启动了,以及对应的historyserver进程(除了客户端节点,所有集群节点都需要启动) |
1 | [root@bigdata04 sparkjars]# sh -x wordCountJob.sh |
1 | 这就是第二种方式,使用spark-submit的方式提交任务到集群中执行 |
使用spark-shell
1 | 这种方式方便在集群环境中调试代码 |
1 | 注意:使用spark-shell的时候,也可以选择指定开启本地spark集群,或者连接standalone集群,或者使用on yarn模式,都是可以的 |
local
1 | 默认是使用local模式开启本地集群,在spark-shell命令行中sparkContex是已经创建好的了,可以直接通过sc使用 |

1 | 输入需要调试执行的代码 |
1 | scala> val linesRDD = sc.textFile("/test/hello.txt") |
1 | 通过-h参数,可以查看spark-shell后面可以跟的参数 |
on yarn
1 | 我们尝试使用on yarn模式 |
standalone
1 | 如果想要使用spark-shell连接spark的standalone集群的话,只需要通过–master参数指定集群主节点的url信息即可。 |
Spark historyServer
1 | 刚才我们使用on yarn模式的时候会发现看不到输出的日志信息,这主要是因为没有开启spark的history server,我们只开启了hadoop的history server |
1 | [root@bigdata04 conf]# mv spark-defaults.conf.template spark-defaults.conf |
1 | 注意:在哪个节点上启动spark的historyserver进程,spark.yarn.historyServer.address的值里面就指定哪个节点的主机名信息 |
1 | 在spark-env.sh中增加以下内容 |
1 | 重新使用on yarn模式向集群提交任务,查看spark的任务界面 |

1 | 然后点击foreach链接 |

1 | 再点击stdout这里 |

1 | 最后就可以看到foreach输出的信息了 |
