大数据开发工程师-第五周 第二章 实战 WordCount


第五周 第二章 实战 WordCount

WordCount案例图解

HYGTED.md.png

1
2

```下面我们来看一个两个文件的执行流程

HYGIHO.md.png

HYJC5Q.md.png

WordCount案例开发

1
2
3
4
5
前面我们通过理论层面详细分析了单词计数的执行流程,下面我们就来实际上手操作一下。
大致流程如下:
第一步:开发Map阶段代码
第二步:开发Reduce阶段代码
第三步:组装Job

源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package com.imooc.mc;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
* 需求:读取hdfs上的hello.txt文件,计算文件中各个单词出现次数
*
* 原始文件内容如下:
* hello you
* hello me
*
* 最终需要的结果形式:
* hello 2
* me 1
* you 1
*
*/

public class WordCountJob {
// 静态内部类
// map阶段
// 注意:myMapper和myReducer类可以提到外面去写,这里只是为了方便学习
public static class myMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
/**
* 需要实现map函数
* 这个函数就是可以接受<k1,v1>, 产生<k2,v2>
* @param k1
* @param v1
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void map(LongWritable k1, Text v1, Context context)
throws IOException, InterruptedException {
// k1代表每一行数据的行首偏移量,v1代表的是每一行的数据
// 需要做的是:把每一行数据的单词切割出来
String[] words = v1.toString().split(" ");
for (String word : words){
// 把切割出来的单词,封装成<k2, 1>
Text k2 = new Text(word);
LongWritable v2 = new LongWritable(1L);
context.write(k2,v2);
}
//super.map(key, value, context);
}
}

/**
* reduce阶段
*/
public static class myReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
/**
* 对<k2, {v2...}>的数据进行累加求和,生成<k3,v3>
* @param k2
* @param v2s
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void reduce(Text k2, Iterable<LongWritable> v2s, Context context)
throws IOException, InterruptedException {
long sum = 0;
for(LongWritable v2:v2s){
sum += v2.get();
}
// Text k3 = k2
context.write(k2, new LongWritable(sum));
// super.reduce(key, values, context);
}
}
/**
* 组装job=map+reduce
*/
public static void main(String[] args){
try {
if(args.length!=2){
System.exit(100);
System.out.println("缺少路径参数!!!");
}
// 指定Job需要的配置参数
Configuration conf = new Configuration();
// 创建一个Job
Job job = Job.getInstance(conf);
// 注意:这一行必须设置,否者在集群中执行时找不到WordCountJob这个类
job.setJarByClass(WordCountJob.class);

// 指定输入路径,可以是文件也可以是目录(目录里只有一个文件时可以); 注意FileInputFormat别选成hadoop1.x的了
FileInputFormat.setInputPaths(job, new Path(args[0]));
// 指定输出路径(只能是hdfs上一个不存在的目录); 注意FileOutFormat别选成hadoop1.x的了
FileOutputFormat.setOutputPath(job, new Path(args[1]));

// 指定map相关代码
job.setMapperClass(myMapper.class);
// 指定k2类型
job.setMapOutputKeyClass(Text.class);
// 指定v2类型
job.setMapOutputValueClass(LongWritable.class);

//指定reduce相关代码
job.setReducerClass(myReducer.class);
// 指定k3类型
job.setOutputKeyClass(Text.class);
// 指定v3类型
job.setOutputValueClass(LongWritable.class);

// 提交job
job.waitForCompletion(true);

}catch(Exception e){
e.printStackTrace();
}

}
}
1
现在代码开发完毕了,现在我们是把自定义的mapper类和reducer类都放到了这个WordCountJob类中,主要是为了在学习阶段看起来清晰一些,所有的代码都在一个类中,好找,其实我们完全可以把自定义的mapper类和reducer类单独提出去,定义为单独的类,是没有什么区别的。

pom文件

1
2
ok,那代码开发好了以后想要执行,我们需要打jar包上传到集群上去执行,这个时候需要在pom文件中
添加maven的编译打包插件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
<build>
<plugins>
<!-- compiler插件, 设定JDK版本 -->
<!--编译插件-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<encoding>UTF-8</encoding>
<source>1.8</source>
<target>1.8</target>
<showWarnings>true</showWarnings>
</configuration>
</plugin>

<!--打包插件,可以打带依赖的jar包-->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<!--这里可以不写入口类,可以动态指定-->
<mainClass></mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
1
2
3
注意了,这些添加完以后还有一个地方需要修改,需要在pom中的hadoop-client和log4j依赖中增加
scope属性,值为provided,表示只在编译的时候使用这个依赖,在执行以及打包的时候都不使用,因为
hadoop-client和log4j依赖在集群中都是有的,所以在打jar包的时候就不需要打进去了,如果我们使用到 了集群中没有的第三方依赖包就不需要增加这个provided属性了,不增加provided就可以把对应的第三方依赖打进jar包里面了。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
完整的pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>org.example</groupId>
<artifactId>bigdata_hadoop</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<!--hadoop的依赖-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.2.2</version>
<!-- provided表示这个依赖只在编译的时候,执行或者打jar的时候都不使用; 执行其它需要这些以来的程序时,要去掉-->
<scope>provided</scope>
</dependency>

<!--log4j的依赖-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.10</version>
<!-- provided表示这个依赖只在编译的时候,执行或者打jar的时候都不使用; 执行其它需要这些以来的程序时,要去掉-->
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.10</version>
<!-- provided表示这个依赖只在编译的时候,执行或者打jar的时候都不使用; 执行其它需要这些以来的程序时,要去掉-->
<scope>provided</scope>
</dependency>
</dependencies>

<build>
<plugins>
<!-- compiler插件, 设定JDK版本 -->
<!--编译插件-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<encoding>UTF-8</encoding>
<source>1.8</source>
<target>1.8</target>
<showWarnings>true</showWarnings>
</configuration>
</plugin>

<!--打包插件,可以打带依赖的jar包-->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<!--这里可以不写入口类,可以动态指定-->
<mainClass></mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>

打jar包

1
2
3
4
添加好了以后就可以打包了,建议在windows的cmd命令行下cd到项目根目录,然后执行mvn编译打包
命令,看到最后输出的BUILD SUCCESS就说明执行成功了

D:\IdeaProjects\db_hadoop>mvn clean package -DskipTests
1
2
3
命令执行成功之后,就可以到target目录下获取对应的jar包了,需要使用jar-with-dependencies结尾的那个jar包。
D:\IdeaProjects\db_hadoop\target\db_hadoop-1.0-SNAPSHOT-jar-with-dependencies.jar 把这个jar包上传到集群的任意一台机器上面或者是hadoop客户端机器上都可以,只要这台机器可以和集群进行交互即可。
注意,这个jar包不能使用java -jar的方式执行,需要使用集群特有的执行方式

向集群提交MapReduce任务

1
2
3
在此之前需要做:
1.需要把jar包上传到linux
2.测试数据hello.txt上传到hdfs
1
hadoop jar bigdata_hadoop-1.0-SNAPSHOT-jar-with-dependencies.jar com.imooc.mc.WordCountJob /test/WordCount /test/WordCount/outcome
1
2
3
4
5
6
7
8
hadoop:表示使用hadoop脚本提交任务,其实在这里使用yarn脚本也是可以的,从hadoop2开始支持使用yarn,不过也兼容hadoop1,也继续支持使用hadoop脚本,所以在这里使用哪个都可以,
具体就看你个人的喜好了,我是习惯于使用hadoop脚本
jar:表示执行jar包
db_hadoop-1.0-SNAPSHOT-jar-with-dependencies.jar:指定具体的jar包路径信息
com.imooc.mr.WordCountJob:指定要执行的mapreduce代码的全路径
/test/hello.txt:指定mapreduce接收到的第一个参数,代表的是输入路径,这里的输入路径可以直
接指定hello.txt的路径,也可以直接指定它的父目录,因为它的父目录里面也没有其它无关的文件,如果指定目录的话就意味着hdfs会读取这个目录下所有的文件,所以后期如果我们需要处理一批文件,那就可以把他们放到同一个目录里面,直接指定目录即可。
/out:指定mapreduce接收到的第二个参数,代表的是输出目录,这里的输出目录必须是不存在的,MapReduce程序在执行之前会检测这个输出目录,如果存在会报错,因为它每次执行任务都需要一个新的输出目录来存储结果数据
1
2
3
4
5
输出目录中,_SUCCESS是一个标记文件,有这个文件表示这个任务执行成功了。
part-r-00000是具体的数据文件,如果有多个reduce任务会产生多个这种文件,多个文件的话会按照从0
开始编号,00001,00002等等

还要一点需要注意的,part后面的r表示这个结果文件是reduce步骤产生的,如果一个mapreduce只有map阶段没有reduce阶段,那么产生的结果文件是part-m-00000这样的

1

1
可以在bigdata:8088里看一些此次提交任务的相关信息

本文标题:大数据开发工程师-第五周 第二章 实战 WordCount

文章作者:TTYONG

发布时间:2022年02月09日 - 18:02

最后更新:2023年06月05日 - 00:06

原始链接:http://tianyong.fun/%E5%A4%A7%E6%95%B0%E6%8D%AE%E5%BC%80%E5%8F%91%E5%B7%A5%E7%A8%8B%E5%B8%88-%E7%AC%AC%E4%BA%94%E5%91%A8-%E7%AC%AC%E4%BA%8C%E7%AB%A0-%E5%AE%9E%E6%88%98-WordCount.html

许可协议: 转载请保留原文链接及作者。

多少都是爱
0%