大数据开发工程师-第十周 第3章 spark实战: 单词统计


第十周 第3章 spark实战: 单词统计

WordCount程序

1
2
3
4
首先看来一个快速入门案例,单词计数
这个需求就是类似于我们在学习MapReduce的时候写的案例
需求这样的:读取文件中的所有内容,计算每个单词出现的次数
这个需求就没什么好分析的了,咱们之前在学习MapReduce的已经分析过了,接下来就来看一下使用Spark需要如何实现
1
注意:由于Spark支持Java、Scala这些语言,目前在企业中大部分公司都是使用Scala语言进行开发,个别公司会使用java进行开发,为了加深大家对Spark的理解,也满足java老程序员的需求,针对本课程中的案例,我们都会先基于Scala代码进行详细的讲解,然后再使用java代码重新实现一遍

Scala代码开发

maven项目的创建

1
2
3
4
5
6
7
8
9
10
下面来创建一个maven项目,集成java和scala的sdk

此时项目中默认带有java的sdk,需要添加scala的sdk

接下来在项目的src/main目录下增加scala目录

需要将新增的scala目录设置为root source
(以上在scala学习时学过了)

开发环境配置完毕

image-20230324114959465

image-20230324115043947

image-20230324115105527

image-20230324115123507

1
2
3
4
5
最后需要添加Spark的maven依赖

注意:由于目前我们下载的spark的安装包中使用的scala是2.11的,所以在这里要选择对应的scala 2.11版本的依赖。

spark这个版本也有对应scala2.12版本的安装包,不过那个安装包里面没有包含hadoop的依赖。

image-20230324120638011

image-20230324121100234

image-20230324121034982

1
2
3
4
5
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.3</version>
</dependency>
1
2
3
4
5
6
注意:我们现在引入的Scala SDK是2.12版本的,建议大家再新增一个2.11版本的scala SDK。

下载scala-2.11.12版本

解压到 D:\scala 目录下
然后添加到idea中

scala代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package com.imooc.scala
import org.apache.spark.{SparkConf, SparkContext}
/**
* 需求:单词计数
* Created by xuwei
*/
object WordCountScala {
def main(args: Array[String]): Unit = {
//第一步:创建SparkContext
val conf = new SparkConf()
conf.setAppName("WordCountScala")//设置任务名称
.setMaster("local")//local表示在本地执行,本地集群,主节点名称;这里也可以不使用链式调用
val sc = new SparkContext(conf)
//第二步:加载数据
val linesRDD = sc.textFile("D:\\hello.txt")
//第三步:对数据进行切割,把一行数据切分成一个一个的单词
val wordsRDD = linesRDD.flatMap(_.split(" "))
//第四步:迭代words,将每个word转化为(word,1)这种形式 //等同于line=>line.split(" ")
val pairRDD = wordsRDD.map((_,1))
//.map(word=>(word,1))
//第五步:根据key(其实就是word)进行分组聚合统计
val wordCountRDD = pairRDD.reduceByKey(_ + _)
//第六步:将结果打印到控制台
// 执行到foreach操作,才会去真正执行前面的所有transformation操作;任务才开始真正执行运算;如果没有这一行auction算子,任务不会执行
wordCountRDD.foreach(wordCount=>println(wordCount._1+"--"+wordCount._2))
//第七步:停止SparkContext
sc.stop()
}
}

//链式调用
linesRDD.flatMap(...).map(...).reduceByKey(...).foreach(...)
1
这里master设置的是local表示直接在本地临时创建一个集群(方便调试),就不用打包放到linux上spark集群运行
1
2
3
4
执行代码,结果如下
you--1
hello--2
me--1

总结

1
2
3
4
5
6
我们再来总结一下代码中这几个RDD中的数据结构
val linesRDD = sc.textFile("D:\\hello.txt")

linesRDD中的数据是这样的:
hello you
hello me
1
2
3
4
5
6
7
val wordsRDD = linesRDD.flatMap(_.split(" "))

wordsRDD中的数据是这样的:
hello
you
hello
me
1
2
3
4
5
6
7
val pairRDD = wordsRDD.map((_,1))

pairRDD 中的数据是这样的
(hello,1)
(you,1)
(hello,1)
(me,1)
1
2
3
4
5
6
val wordCountRDD = pairRDD.reduceByKey(_ + _)

wordCountRDD 中的数据是这样的
(hello,2)
(you,1)
(me,1)

java代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package com.imooc.java;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.*;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
/**
* 需求:单词计数
* Created by xuwei
*/
public class WordCountJava {
public static void main(String[] args) {
//第一步:创建SparkContext:
//注意,针对java代码需要获取JavaSparkContext
SparkConf conf = new SparkConf();
conf.setAppName("WordCountJava")
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
//第二步:加载数据
JavaRDD<String> linesRDD = sc.textFile("D:\\hello.txt");
//第三步:对数据进行切割,把一行数据切分成一个一个的单词
//注意:FlatMapFunction的泛型,第一个参数表示输入数据类型,第二个表示是输出数据类型
JavaRDD<String> wordsRDD = linesRDD.flatMap(new FlatMapFunction<String,String>(){
public Iterator<String> call(String line) throws Exception {
return Arrays.asList(line.split(" ")).iterator();
}
});

//第四步:迭代words,将每个word转化为(word,1)这种形式
//注意:PairFunction的泛型,第一个参数是输入数据类型
//第二个是输出tuple中的第一个参数类型,第三个是输出tuple中的第二个参数类型
//注意:如果后面需要使用到....ByKey,前面都需要使用mapToPair去处理
JavaPairRDD<String, Integer> pairRDD = wordRDD.mapToPair(new PairFunction<String,String,Integer>(){
public Tuple2<String, Integer> call(String word) throws Exception{
return new Tuple2<String, Integer>(word, 1);
}
});

//第五步:根据key(其实就是word)进行分组聚合统计
JavaPairRDD<String, Integer> wordCountRDD = pairRDD.reduceByKey(new Function2<Integer,Integer,Integer>(){
public Integer call(Integer i1, Integer i2) throws Exception {
return i1 + i2;
}
});

//第六步:将结果打印到控制台
wordCountRDD.foreach(new VoidFunction<Tuple2<String, Integer>>() {
public void call(Tuple2<String, Integer> tup) throws Exception {
System.out.println(tup._1+"--"+tup._2);
}
});


//第七步:停止sparkContext
sc.stop();
}

总结

1
2
3
4
5
6
在编写一行JAVA语句时,有返回值的方法已经决定了返回对象的类型和泛型类型,我们只需要给这个对象起个名字就行。

如果使用快捷键生成这个返回值,我们就可以减少不必要的打字和思考,专注于过程的实现。

1.把光标移动到需要生成返回值变量的语句之前
2.右键Refactor-Extract-Variable,也可以按快捷键ctrl+alt+v

url

任务提交

1
针对任务的提交有这么几种形式

使用idea

1
2
直接在idea中执行,方便在本地环境调试代码
咱们刚才使用的就是这种方式

使用spark-submit

添加打包所用依赖

1
2
3
4
5
使用spark-submit提交到集群执行,实际工作中会使用这种方式
那接下来我们需要把我们的代码提交到集群中去执行

这个时候就需要对代码打包了
首先在项目的pom文件中添加build配置,和dependencies标签平级
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
<build>
<plugins>
<!-- java编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<!-- scala编译插件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.1.6</version>
<configuration>
<scalaCompatVersion>2.11</scalaCompatVersion>
<scalaVersion>2.11.12</scalaVersion>
</configuration>
<executions>
<execution>
<id>compile-scala</id>
<phase>compile</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>test-compile-scala</id>
<phase>test-compile</phase>
<goals>
<goal>add-source</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- 打包插件 -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass></mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
1
注意:这里面的scala版本信息要使用2.11,因为这个spark安装包中依赖的scala是2.11的。
1
2
3
4
5
6
7
8
然后把spark-core依赖的作用域设置为provided(因为这是提交到集群运行,spark已经有了spark核心的jar包)

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>2.4.3</version>
<scope>provided</scope>
</dependency>

修改代码

1
2
3
4
修改代码中的输入文件路径信息,因为这个时候无法读取windows中的数据了,把代码修改成动态接收输入文件路径

还需要将 setMaster("local") 注释掉,后面我们会在提交任务的时候动态指定master信息
修改WordCountScala
1
2
3
4
5
6
7
8
9
10
11
//第一步:创建SparkContext
val conf = new SparkConf()
conf.setAppName("WordCountScala")//设置任务名称
//.setMaster("local")//local表示在本地执行
val sc = new SparkContext(conf);
//第二步:加载数据
var path = "D:\\hello.txt"
if(args.length==1){
path = args(0)
}
val linesRDD = sc.textFile(path
1
2
3
4
5
6
7
8
9
10
11
12
13
14
修改WordCountJava

//第一步:创建SparkContext:
//注意,针对java代码需要获取JavaSparkContext
SparkConf conf = new SparkConf();
conf.setAppName("WordCountJava");
//.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
//第二步:加载数据
String path = "D:\\hello.txt";
if(args.length==1){
path = args[0];
}
JavaRDD<String> linesRDD = sc.textFile(path);

打包

1
mvn clean package -DskipTests

image-20230324145345028

上传,提交任务

2.4.3官方文档

1
2
3
4
5
6
7
8
9
10
11
# 提交任务格式
# Run on a YARN cluster
export HADOOP_CONF_DIR=XXX
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \ # can be client for client mode
--executor-memory 20G \
--num-executors 50 \
/path/to/examples.jar \
1000
1
注意:为了能够方便使用spark-submit脚本,需要在/etc/profile中配置SPARK_HOME环境变量
1
2
3
4
5
6
7
8
9
10
由于spark-submit命令后面的参数有点多,所以在这我们最好是写一个脚本去提交任务
[root@bigdata04 sparkjars]# vi wordCountJob.sh
spark-submit \(不写成一行时要这样写)
--class com.imooc.scala.WordCountScala \
--master yarn \
--deploy-mode client \(cluster也行)
--executor-memory 1G \
--num-executors 1 \
db_spark-1.0-SNAPSHOT-jar-with-dependencies.jar \
hdfs://bigdata01:9000/test/hello.txt
1
2
3
在提交任务之前还需要先把hadoop集群启动了,以及对应的historyserver进程(除了客户端节点,所有集群节点都需要启动)

bin/mapred --daemon start historyserver
1
2
3
4
5
[root@bigdata04 sparkjars]# sh -x wordCountJob.sh

此时任务会被提交到YARN集群中,可以看到任务执行成功了(8088)

但是注意:此时想要查看foreach中打印的结果目前是看不到的,需要通过spark自己的任务界面才能看到,现在只有在任务运行中的时候,我们才能进到spark的任务界面,任务执行结束之后我们就进不去了,一会我们再解决这个问题
1
2
这就是第二种方式,使用spark-submit的方式提交任务到集群中执行
其实spark-submit中还可以配置很多参数,具体等后面我们用到的时候再去详细分析,现在分析不太好理解

使用spark-shell

1
2
3
4
5
6
这种方式方便在集群环境中调试代码
有一些代码对环境没有特殊依赖的时候我们可以直接使用第一种方式,在idea中调试代码

但是有时候代码需要依赖线上的一些环境,例如:需要依赖线上的数据库中的数据,由于权限问题,我们在本地是无法连接的

这个时候想要调试代码的话,可以选择使用spark-shell的方式,直接在线上服务器中开启一个spark的交互式命令行窗口
1
注意:使用spark-shell的时候,也可以选择指定开启本地spark集群,或者连接standalone集群,或者使用on yarn模式,都是可以的

local

1
默认是使用local模式开启本地集群,在spark-shell命令行中sparkContex是已经创建好的了,可以直接通过sc使用

image-20230324151324720

1
输入需要调试执行的代码
1
2
3
4
5
6
7
8
9
10
11
12
scala> val linesRDD = sc.textFile("/test/hello.txt")
linesRDD: org.apache.spark.rdd.RDD[String] = /test/hello.txt MapPartitionsRDD
scala> val wordsRDD = linesRDD.flatMap(_.split(" "))
wordsRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at flatMap at
scala> val pairRDD = wordsRDD.map((_,1))
pairRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[3] at map
scala> val wordCountRDD = pairRDD.reduceByKey(_ + _)
wordCountRDD: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at red
scala> wordCountRDD.foreach(wordCount=>println(wordCount._1+"--"+wordCount._2))
you--1
hello--2
me--1
1
通过-h参数,可以查看spark-shell后面可以跟的参数

on yarn

1
2
3
我们尝试使用on yarn模式

saprk-shell --master yarn --deploy-mode client

standalone

1
如果想要使用spark-shell连接spark的standalone集群的话,只需要通过–master参数指定集群主节点的url信息即可。

Spark historyServer

1
2
3
4
5
6
刚才我们使用on yarn模式的时候会发现看不到输出的日志信息,这主要是因为没有开启spark的history server,我们只开启了hadoop的history server
任意选择一个服务器开启spark的history server进程都可以,选择集群内的节点也可以,选择spark的客户端节点也可以

下面我们就在这个spark的客户端节点上启动spark的history server进程
需要修改spark-defaults.conf和spark-env.sh
首先对 spark-defaults.conf.template重命名
1
2
3
4
5
6
7
8
9
[root@bigdata04 conf]# mv spark-defaults.conf.template spark-defaults.conf

然后在 spark-defaults.conf中增加以下内容
[root@bigdata04 conf]# vi spark-defaults.conf
spark.eventLog.enabled=true
spark.eventLog.compress=true
spark.eventLog.dir=hdfs://bigdata01:9000/tmp/logs/root/logs
spark.history.fs.logDirectory=hdfs://bigdata01:9000/tmp/logs/root/logs
spark.yarn.historyServer.address=http://bigdata04:18080
1
注意:在哪个节点上启动spark的historyserver进程,spark.yarn.historyServer.address的值里面就指定哪个节点的主机名信息
1
2
3
4
5
6
7
8
9
10
11
在spark-env.sh中增加以下内容

export SPARK_HISTORY_OPTS="-Dspark.history.ui.port=18080 -Dspark.history.fs.logDirectory=hdfs://bigdata01:9000/tmp/logs/root/logs"

启动
[root@bigdata04 spark-2.4.3-bin-hadoop2.7]# sbin/start-history-server.sh
starting org.apache.spark.deploy.history.HistoryServer, logging to /data/soft

[root@bigdata04 spark-2.4.3-bin-hadoop2.7]# jps
2903 Jps
2862 HistoryServer
1
2
重新使用on yarn模式向集群提交任务,查看spark的任务界面
点击任务后面的history链接即可

image-20230324153528888

1
然后点击foreach链接

image-20230324153553898

1
再点击stdout这里

image-20230324153618631

1
最后就可以看到foreach输出的信息了

image-20230324153639832


本文标题:大数据开发工程师-第十周 第3章 spark实战: 单词统计

文章作者:TTYONG

发布时间:2022年03月02日 - 10:03

最后更新:2023年06月04日 - 15:06

原始链接:http://tianyong.fun/%E5%A4%A7%E6%95%B0%E6%8D%AE%E5%BC%80%E5%8F%91%E5%B7%A5%E7%A8%8B%E5%B8%88-%E7%AC%AC%E5%8D%81%E5%91%A8-%E7%AC%AC3%E7%AB%A0-spark%E5%AE%9E%E6%88%98-%E5%8D%95%E8%AF%8D%E7%BB%9F%E8%AE%A1.html

许可协议: 转载请保留原文链接及作者。

多少都是爱
0%