大数据开发工程师-第五周 第三章 深入 MapReduce


第五周 第三章 深入 MapReduce

MapReduce任务日志查看

1
2
3
4
5
如果想要查看mapreduce任务执行过程产生的日志信息怎么办呢?
是不是在提交任务的时候直接在这个控制台上就能看到了?先不要着急,我们先在代码中增加一些日志信息,在实际工作中做调试的时候这个也是很有必要的
在自定义mapper类的map函数中增加一个输出,将k1,v1的值打印出来

在自定义reducer类中的reduce方法中增加一个输出,将k2,v2和k3,v3的值打印出来

开启yarn的日志聚合功能方式

自定义日志

map函数修改
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
protected void map(LongWritable k1, Text v1, Context context)
throws IOException, InterruptedException {
// k1代表每一行数据的行首偏移量,v1代表的是每一行的数据
// 需要做的是:把每一行数据的单词切割出来
System.out.println("<k1, v1>: <"+k1.get()+", "+v1.toString()+">"); # 增加处
String[] words = v1.toString().split(" ");
for (String word : words){
// 把切割出来的单词,封装成<k2, 1>
Text k2 = new Text(word);
LongWritable v2 = new LongWritable(1L);
context.write(k2,v2);
}
//super.map(key, value, context);
}
}
reduce函数修改
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Override
protected void reduce(Text k2, Iterable<LongWritable> v2s, Context context)
throws IOException, InterruptedException {
long sum = 0;
for(LongWritable v2:v2s){
System.out.println("<k2, v2>: <"+k2.toString()+", "+v2.get()+">"); # 增加处
sum += v2.get();
}
Text k3 = k2;
LongWritable v3 = new LongWritable(sum);
System.out.println("<k3, v3>: <"+k3.toString()+", "+v3.get()+">"); # 增加处
context.write(k3, v3);
// super.reduce(key, values, context);
}
}
打包,上传,提交任务到集群
结果查看
1
2
3
4
5
6
7
8
9
10
等待任务执行结束,我们发现在控制台上是看不到任务中的日志信息的,为什么呢?因为我们在这相当于是通过一个客户端把任务提交到集群里面去执行了,所以日志是存在在集群里面的。想要查看需要需要到一个特殊的地方查看这些日志信息
由于程序是提交到集群执行,所以在linux的终端上是看不到的

先进入到yarn的web界面,访问8088端口,点击对应任务的history链接(在此之前先把bigdata02,bigdata03与ip的映射在windows的hosts文件上配置好)
http://bigdata01:8088/

注意了,在这里我们发现这个链接是打不来的,
这里有两个原因:
第一个原因是没有windows的hosts文件中没有配置bigdata02和bigdata03这两个主机名和ip的映射关系,先去把这两个主机名配置到hosts文件里面,之前的bigdata01已经配置进去了
第二个原因就是这里必须要启动historyserver进程才可以,并且还要开启日志聚合功能,才能在web界面上直接查看任务对应的日志信息,因为默认情况下任务的日志是散落在nodemanager节点上的,想要查看需要找到对应的nodemanager节点上去查看,这样就很不方便,通过日志聚合功能我们可以把之前本来散落在nodemanager节点上的日志统一收集到hdfs上的指定目录中,这样就可以在yarn的web界面中直接查看了
打开日志聚合功能
1
2
3
4
那我们就来开启日志聚合功能。开启日志聚合功能需要修改yarn-site.xml的配置,增加
yarn.log-aggregation-enable和yarn.log.server.url这两个参数

注意:修改这个配置想要生效需要重启集群,每个节点都要修改
1
2
3
4
5
6
7
8
9
添加部分
<property>
<name>yarn.log-aggregation-enable</name>
<value>true</value>
</property>
<property>
<name>yarn.log.server.url</name>
<value>http://bigdata01:19888/jobhistory/logs/</value>
</property>
1
2
3
4
5
6
7
8
9
10
11
12
13
启动historyserver进程,需要在集群的所有节点上都启动这个进程

[root@bigdata01 hadoop-3.2.0]# bin/mapred --daemon start historyserver
[root@bigdata01 hadoop-3.2.0]# jps
4232 SecondaryNameNode
5192 JobHistoryServer
4473 ResourceManager
3966 NameNode
5231 Jps

其它节点也需执行

重新再提交mapreduce任务
1
2
3
4
5
此时再进入yarn的8088界面,点击任务对应的history链接就可以打开了。
此时,点击对应map和reduce后面的链接就可以点进去查看日志信息了,点击map后面的数字1,可以
点击这个界面中的logs文字链接,可以查看详细的日志信息

最终可以在界面中看到很多日志信息,我们刚才使用sout输出的日志信息需要到Log Type: stdout这里来查看,在这里可以看到,k1和v1的值

HdADit.png

1
2
  想要查看reduce输出的日志信息需要到reduce里面查看,操作流程是一样的,可以看到k2,v2和k3,v3的

HdA0II.png

使用logger

1
咱们刚才的输出是使用syout输出的,这个其实是不正规的,标准的日志写法是需要使用logger进行输出的
修改map函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public static class myMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
Logger logger = LoggerFactory.getLogger(myMapper.class);
/**
* 需要实现map函数
* 这个函数就是可以接受<k1,v1>, 产生<k2,v2>
* @param k1
* @param v1
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void map(LongWritable k1, Text v1, Context context)
throws IOException, InterruptedException {
// k1代表每一行数据的行首偏移量,v1代表的是每一行的数据
// 需要做的是:把每一行数据的单词切割出来
logger.info("<k1, v1>: <"+k1.get()+", "+v1.toString()+">");
//System.out.println("<k1, v1>: <"+k1.get()+", "+v1.toString()+">");
String[] words = v1.toString().split(" ");
for (String word : words){
// 把切割出来的单词,封装成<k2, 1>
Text k2 = new Text(word);
LongWritable v2 = new LongWritable(1L);
context.write(k2,v2);
}
//super.map(key, value, context);
}
}
修改reduce函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public static class myReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
Logger logger = LoggerFactory.getLogger(myReducer.class); #!!!
/**
* 对<k2, {v2...}>的数据进行累加求和,生成<k3,v3>
* @param k2
* @param v2s
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void reduce(Text k2, Iterable<LongWritable> v2s, Context context)
throws IOException, InterruptedException {
long sum = 0;
for(LongWritable v2:v2s){
logger.info("<k2, v2>: <"+k2.toString()+", "+v2.get()+">"); #!!!
//System.out.println("<k2, v2>: <"+k2.toString()+", "+v2.get()+">");
sum += v2.get();
}
Text k3 = k2;
LongWritable v3 = new LongWritable(sum);
logger.info("<k3, v3>: <"+k3.toString()+", "+v3.get()+">"); #!!!
//System.out.println("<k3, v3>: <"+k3.toString()+", "+v3.get()+">");
context.write(k3, v3);
// super.reduce(key, values, context);
}
}
打包,上传,提交任务到集群
1
重新编译打包上传,重新提交最新的jar包,这个时候再查看日志就需要到Log Type: syslog中查看日志了。

HdeR10.md.png

Hde2pq.md.png

1
这是工作中比较常用的查看日志的方式,但是还有一种使用命令查看的方式,这种方式面试的时候一般喜欢问

面试爱问的日志查看方式(面试)

1
2
3
4
 yarn logs -applicationId application_158771356 | grep k1,v1

注意:后面指定的是任务id,任务id可以到yarn的web界面上查看。
执行这个命令可以看到很多的日志信息,我们通过grep筛选一下日志
1
这种方式也需要大家能够记住并且掌握住,首先是面试的时候可能会问到,还有就是针对某一些艰难的场景下,无法使用yarn的web界面查看日志,就需要使用yarn logs命令了

停止Hadoop集群中的任务

1
2
3
4
5
6
  如果一个mapreduce任务处理的数据量比较大的话,这个任务会执行很长时间,可能几十分钟或者几个小时都有可能,假设一个场景,任务执行了一半了我们发现我们的代码写的有问题,需要修改代码重新提交执行,这个时候之前的任务就没有必要再执行了,没有任何意义了,最终的结果肯定是错误的,所以我们就想把它停掉,要不然会额外浪费集群的资源,如何停止呢?
我在提交任务的窗口中按ctrl+c是不是就可以停止?
注意了,不是这样的,我们前面说过,这个任务是提交到集群执行的,你在提交任务的窗口中执行ctrl+c对已经提交到集群中的任务是没有任何影响的。
我们可以验证一下,执行ctrl+c之后你再到yarn的8088界面查看,会发现任务依然存在。
所以需要使用hadoop集群的命令去停止正在运行的任务
使用yarn application -kill命令,后面指定任务id即可
1
[root@bigdata01 hadoop-3.2.0]# yarn application -kill application_15877135678

HdGYSe.md.png

HdlgVf.md.png

MapReduce程序扩展

1
2
3
4
  咱们前面说过MapReduce任务是由map阶段和reduce阶段组成的但是我们也说过,reduce阶段不是必须的,那也就意味着MapReduce程序可以只包含map阶段。
什么场景下会只需要map阶段呢?
当数据只需要进行普通的过滤、解析等操作,不需要进行聚合,这个时候就不需要使用reduce阶段了,在代码层面该如何设置呢?
很简单,在组装Job的时候设置reduce的task数目为0就可以了。并且Reduce代码也不需要写了。

map阶段编写

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public static class myMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
Logger logger = LoggerFactory.getLogger(myMapper.class);
/**
* 需要实现map函数
* 这个函数就是可以接受<k1,v1>, 产生<k2,v2>
* @param k1
* @param v1
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void map(LongWritable k1, Text v1, Context context)
throws IOException, InterruptedException {
// k1代表每一行数据的行首偏移量,v1代表的是每一行的数据
// 需要做的是:把每一行数据的单词切割出来
logger.info("<k1, v1>: <"+k1.get()+", "+v1.toString()+">");
//System.out.println("<k1, v1>: <"+k1.get()+", "+v1.toString()+">");
String[] words = v1.toString().split(" ");
for (String word : words){
// 把切割出来的单词,封装成<k2, 1>
Text k2 = new Text(word);
LongWritable v2 = new LongWritable(1L);
context.write(k2,v2);
}
//super.map(key, value, context);
}
}

main编写

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public static void main(String[] args){
try {
if(args.length!=2){
System.exit(100);
System.out.println("缺少路径参数!!!");
}
// 指定Job需要的配置参数
Configuration conf = new Configuration();
// 创建一个Job
Job job = Job.getInstance(conf);
// 注意:这一行必须设置,否者在集群中执行时找不到WordCountJob这个类
job.setJarByClass(WordCountJobNoReduce.class);

// 指定输入路径,可以是文件也可以是目录(目录里只有一个文件时可以); 注意FileInputFormat别选成hadoop1.x的了
FileInputFormat.setInputPaths(job, new Path(args[0]));
// 指定输出路径(只能是hdfs上一个不存在的目录); 注意FileOutFormat别选成hadoop1.x的了
FileOutputFormat.setOutputPath(job, new Path(args[1]));

// 指定map相关代码
job.setMapperClass(myMapper.class);
// 指定k2类型
job.setMapOutputKeyClass(Text.class);
// 指定v2类型
job.setMapOutputValueClass(LongWritable.class);

// 禁用reduce
job.setNumReduceTasks(0);

// 提交job
job.waitForCompletion(true);

}catch(Exception e){
e.printStackTrace();
}

}

打包,上传,提交到集群,结果查看

HdtUOJ.png

1
2
这里发现map执行到100%以后任务就执行成功了,reduce还是0%,因为就没有reduce阶段了。
查看输出结果,注意,这里的文件名就是part-m-00000了

HdtNy4.png

HdttlF.png


本文标题:大数据开发工程师-第五周 第三章 深入 MapReduce

文章作者:TTYONG

发布时间:2022年02月09日 - 18:02

最后更新:2023年06月04日 - 15:06

原始链接:http://tianyong.fun/%E5%A4%A7%E6%95%B0%E6%8D%AE%E5%BC%80%E5%8F%91%E5%B7%A5%E7%A8%8B%E5%B8%88-%E7%AC%AC%E4%BA%94%E5%91%A8-%E7%AC%AC%E4%B8%89%E7%AB%A0-%E6%B7%B1%E5%85%A5-MapReduce.html

许可协议: 转载请保留原文链接及作者。

多少都是爱
0%