hadoop实验课-mapreduce


MapReduce实验

image-20230605001153532

案例一 wordcount

准备工作

jar包导入

1.将mapreduce的jar包复制到eclipse的hadoop项目的lib里

/usr/hadoop/…/share/mapreduce/

/home/…/hadoop/lib/

2.打开eclipse,在项目上单击右键,build path -> configure ..->libraries->add external jars

编写wordcount的map()和reduce()函数

map()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package sudo.edu.hadoop.mapreduce.wordCount;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class wordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{

@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
String data = value.toString();
String[] words = data.split(" ");
for(String w: words) {
context.write(new Text(w), new IntWritable(1));
}
}
}
reduce()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package sudo.edu.hadoop.mapreduce.wordCount;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class wordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{

@Override
protected void reduce(Text k3, Iterable<IntWritable> v3,
Context context) throws IOException, InterruptedException {
// TODO Auto-generated method stub
int total = 0;
for(IntWritable v : v3) {
total += v.get();
}
context.write(k3, new IntWritable(total));
}
}
main()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package sudo.edu.hadoop.mapreduce.wordCount;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class wordCountMain {
public static void main(String[] args) throws Exception{
//
Job job = Job.getInstance(new Configuration());
job.setJarByClass(wordCountMain.class);
//
job.setMapperClass(wordCountMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//
job.setReducerClass(wordCountReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}

案例二 各部门员工薪水总和

image-20230605003348442

image-20230605002829991

1
2
3
4
5
6
7
8
9
10
11
12
13
14
7369,SMITH,CLERK,7902,1980/12/17,800,,20 
7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30
7521,WARD,SALESMAN,7698,1981/2/22,1250,500,30
7566,JONES,MANAGER,7839,1981/4/2,2975,,20
7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
7698,BLAKE,MANAGER,7839,1981/5/1,2850,,30
7782,CLARK,MANAGER,7839,1981/6/9,2450,,10
7788,SCOTT,ANALYST,7566,1987/4/19,3000,,20
7839,KING,PRESIDENT,,1981/11/17,5000,,10
7844,TURNER,SALESMAN,7698,1981/9/8,1500,0,30
7876,ADAMS,CLERK,7788,1987/5/23,1100,,20
7900,JAMES,CLERK,7698,1981/12/3,950,,30
7902,FORD,ANALYST,7566,1981/12/3,3000,,20
7934,MILLER,CLERK,7782,1982/1/23,1300,,10

mapper

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package saltotal; 
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
// k1 v1 k2 v2
public class SalaryTotalMapper extends Mapper<LongWritable, Text, IntWritable,
IntWritable> {
@Override
protected void map(LongWritable key1, Text value1,Context context)
throws IOException, InterruptedException {
//数据:7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
String data = value1.toString();
//分词
String[] words = data.split(",");
//输出:k2 部门号 v2 薪水
context.write(new IntWritable(Integer.parseInt(words[7])),
new IntWritable(Integer.parseInt(words[5]))); //薪水
}
}

reducer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package saltotal; 
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
// k3 v3 k4 v4
public class SalaryTotalReducer extends Reducer<IntWritable, IntWritable, IntWritable,
IntWritable> {
@Override
protected void reduce(IntWritable k3, Iterable<IntWritable> v3,Context context)
throws IOException, InterruptedException {
//对 v3 求和,得到该部门的工资总额
int total = 0;
for(IntWritable v:v3){
total += v.get();
}
//输出: 部门号 总额
context.write(k3, new IntWritable(total));
}
}

主程序入口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package saltotal; 
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class SalaryTotalMain {
public static void main(String[] args) throws Exception {
//创建一个 job
Job job = Job.getInstance(new Configuration());
job.setJarByClass(SalaryTotalMain.class);
//指定 job 的 mapper 和输出的类型 k2 v2
job.setMapperClass(SalaryTotalMapper.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(IntWritable.class);
//指定 job 的 reducer 和输出的类型 k4 v4
job.setReducerClass(SalaryTotalReducer.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
//指定 job 的输入和输出的路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//执行任务
job.waitForCompletion(true);
}
}

案例三 序列化和反序列化

1
2
3
4
5
6
7
8
序列化是一种将内存中的Java对象转化为其他可存储文件或可跨计算机传输数据流的一种技术。
由于运行程序的过程中,保存在内存中的 Java 对象会因为断电而丢失,或在分布式系统中,
Java 对象需要从一台计算机传递给其他计算机进行计算,所以 Java 对象需要通过某种技术转换为
文件或实际可传输的数据流。这就是 Java 的序列化。常见的 Java 序列化方式是实现
java.io.Serializable 接口。

public class Student implements java.io.Serializable{
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
 Hadoop 的序列化则是实现 org.apache.hadoop.io.Writable 接口,该接口包含 readFields()、
write()两个方法,代码如下。
public interface Writable {
/**
* Serialize the fields of this object to <code>out</code>.
*
* @param out <code>DataOuput</code> to serialize this object into.
* @throws IOException
*/
void write(DataOutput out) throws IOException;
/**
* Deserialize the fields of this object from <code>in</code>.
*
* <p>For efficiency, implementations should attempt to re-use storage in the
* existing object where possible.</p>
*
* @param in <code>DataInput</code> to deseriablize this object from.
* @throws IOException
*/
void readFields(DataInput in) throws IOException;
}
1
2
3
4
MapReduce 中涉及的 k1、v1、k2、v2、k3、v3、k4、v4 都需要序列化,意味着需要实现 Writable
接口,并实现 readFields()、write()方法。
本次案例对案例二进行优化。将 v2 定义为员工信息,类型为 Employee。
对应的代码实现如下。

Employee

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package sudo.edu.hadoop.mapreduce.employeeDemo.serializable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
//数据: 7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30


public class Employee implements Writable{

private int empno;
private String ename;
private String job;
private int mgr;
private String hiredate;
private int sal;
private int comm;
private int deptno;

@Override
public void readFields(DataInput input) throws IOException {
// 反序列化
this.empno = input.readInt();
this.ename = input.readUTF();
this.job = input.readUTF();
this.mgr = input.readInt();
this.hiredate = input.readUTF();
this.sal = input.readInt();
this.comm = input.readInt();
this.deptno = input.readInt();
}


@Override
public void write(DataOutput output) throws IOException {
// 序列化
output.writeInt(this.empno);
output.writeUTF(this.ename);
output.writeUTF(this.job);
output.writeInt(this.mgr);
output.writeUTF(this.hiredate);
output.writeInt(this.sal);
output.writeInt(this.comm);
output.writeInt(this.deptno);
}

public int getEmpno() {
return empno;
}
public void setEmpno(int empno) {
this.empno = empno;
}
public String getEname() {
return ename;
}
public void setEname(String ename) {
this.ename = ename;
}
public String getJob() {
return job;
}
public void setJob(String job) {
this.job = job;
}
public int getMgr() {
return mgr;
}
public void setMgr(int mgr) {
this.mgr = mgr;
}
public String getHiredate() {
return hiredate;
}
public void setHiredate(String hiredate) {
this.hiredate = hiredate;
}
public int getSal() {
return sal;
}
public void setSal(int sal) {
this.sal = sal;
}
public int getComm() {
return comm;
}
public void setComm(int comm) {
this.comm = comm;
}
public int getDeptno() {
return deptno;
}
public void setDeptno(int deptno) {
this.deptno = deptno;
}
}

Mapper

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package sudo.edu.hadoop.mapreduce.employeeDemo.serializable;

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

// k1 v1 k2 v2
public class salaryTotalMapper extends Mapper<LongWritable, Text, IntWritable, Employee> {
//创建员工对象
Employee e = new Employee();
IntWritable k2 = new IntWritable();
@Override
protected void map(LongWritable key1, Text value1, Context context) throws IOException, InterruptedException {
//数据: 7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
String data = value1.toString();
//分词
String[] words = data.split(",");

//设置员工的属性
//员工号
e.setEmpno(Integer.parseInt(words[0]));
//姓名
e.setEname(words[1]);
//职位
e.setJob(words[2]);
//老板号(注意:可能没有老板号)
try {
e.setMgr(Integer.parseInt(words[3]));
} catch (Exception ex) {
//没有老板号
e.setMgr(-1);
}
//入职日期
e.setHiredate(words[4]);
//月薪
e.setSal(Integer.parseInt(words[5]));
//奖金(注意:奖金也可能没有)
try {
e.setComm(Integer.parseInt(words[6]));
} catch (Exception ex) {
//没有奖金
e.setComm(0);
}
//部门号
e.setDeptno(Integer.parseInt(words[7]));


k2.set(e.getDeptno());
//输出: k2 部门号 v2 员工对象
context.write(k2,e);
}
}

Reducer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package sudo.edu.hadoop.mapreduce.employeeDemo.serializable;
import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;

public class salaryTotalReducer extends Reducer<IntWritable, Employee, IntWritable, IntWritable>{
IntWritable value = new IntWritable();
@Override
protected void reduce(IntWritable k3, Iterable<Employee> v3,Context context)
throws IOException, InterruptedException {
//对v3求和,得到该部门的工资总额
int total = 0;
for(Employee v:v3){
total += v.getSal();
}
value.set(total);
//输出: 部门号 总额
context.write(k3, value);
}
}

Main

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package sudo.edu.hadoop.mapreduce.employeeDemo.serializable;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class salaryTotalMain {
public static void main(String[] args) throws Exception {
//创建一个job
Job job = Job.getInstance(new Configuration());
job.setJarByClass(salaryTotalMain.class);
//指定job的mapper和输出的类型k2 v2
job.setMapperClass(salaryTotalMapper.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Employee.class);
//指定job的reducer和输出的类型 k4 v4
job.setReducerClass(salaryTotalReducer.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
//指定job的输入和输出的路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//执行任务
job.waitForCompletion(true);
}
}

分区

Mapper 划分数据的过程称作为分区( Partition),负责实现划分的数据的类称为 Partitioner。

MapReduce 默认的 Partitioner 是 HashPartitioner

partitioner

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package sudo.edu.hadoop.mapreduce.employee.partitioner;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;
/*
* 建立自己的分区规则:根据员工的部门号进行分区
* 根据 Map 的输出 k2 v2
*/
public class salaryTotalPatitioner extends Partitioner<IntWritable, Employee>{
/*
* numPartition 参数:建立多少个分区
*/
@Override
public int getPartition(IntWritable k2, Employee v2, int numPartition) {
// 如何建立分区
if(v2.getDeptno() == 10){
//放入 1 号分区中
return 1%numPartition;
}else if(v2.getDeptno() == 20){
//放入 2 号分区中
return 2%numPartition;
}else{
//放入 0 号分区中
return 3%numPartition;
}
}
}

Employee

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
package sudo.edu.hadoop.mapreduce.employee.partitioner;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
//数据: 7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30


public class Employee implements Writable{

private int empno;
private String ename;
private String job;
private int mgr;
private String hiredate;
private int sal;
private int comm;
private int deptno;

@Override
public void readFields(DataInput input) throws IOException {
// 反序列化
this.empno = input.readInt();
this.ename = input.readUTF();
this.job = input.readUTF();
this.mgr = input.readInt();
this.hiredate = input.readUTF();
this.sal = input.readInt();
this.comm = input.readInt();
this.deptno = input.readInt();
}


@Override
public void write(DataOutput output) throws IOException {
// 序列化
output.writeInt(this.empno);
output.writeUTF(this.ename);
output.writeUTF(this.job);
output.writeInt(this.mgr);
output.writeUTF(this.hiredate);
output.writeInt(this.sal);
output.writeInt(this.comm);
output.writeInt(this.deptno);
}

public int getEmpno() {
return empno;
}
public void setEmpno(int empno) {
this.empno = empno;
}
public String getEname() {
return ename;
}
public void setEname(String ename) {
this.ename = ename;
}
public String getJob() {
return job;
}
public void setJob(String job) {
this.job = job;
}
public int getMgr() {
return mgr;
}
public void setMgr(int mgr) {
this.mgr = mgr;
}
public String getHiredate() {
return hiredate;
}
public void setHiredate(String hiredate) {
this.hiredate = hiredate;
}
public int getSal() {
return sal;
}
public void setSal(int sal) {
this.sal = sal;
}
public int getComm() {
return comm;
}
public void setComm(int comm) {
this.comm = comm;
}
public int getDeptno() {
return deptno;
}
public void setDeptno(int deptno) {
this.deptno = deptno;
}
}

Mapper

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package sudo.edu.hadoop.mapreduce.employee.partitioner;

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

// k1 v1 k2 v2
public class salaryTotalMapper extends Mapper<LongWritable, Text, IntWritable, Employee> {
//创建员工对象
Employee e = new Employee();
IntWritable k2 = new IntWritable();
@Override
protected void map(LongWritable key1, Text value1, Context context) throws IOException, InterruptedException {
//数据: 7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
String data = value1.toString();
//分词
String[] words = data.split(",");

//设置员工的属性
//员工号
e.setEmpno(Integer.parseInt(words[0]));
//姓名
e.setEname(words[1]);
//职位
e.setJob(words[2]);
//老板号(注意:可能没有老板号)
try {
e.setMgr(Integer.parseInt(words[3]));
} catch (Exception ex) {
//没有老板号
e.setMgr(-1);
}
//入职日期
e.setHiredate(words[4]);
//月薪
e.setSal(Integer.parseInt(words[5]));
//奖金(注意:奖金也可能没有)
try {
e.setComm(Integer.parseInt(words[6]));
} catch (Exception ex) {
//没有奖金
e.setComm(0);
}
//部门号
e.setDeptno(Integer.parseInt(words[7]));


k2.set(e.getDeptno());
//输出: k2 部门号 v2 员工对象
context.write(k2,e);
}
}

Reducer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package sudo.edu.hadoop.mapreduce.employee.partitioner;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;

public class salaryTotalReducer extends Reducer<IntWritable, Employee, IntWritable, IntWritable>{
IntWritable value = new IntWritable();
@Override
protected void reduce(IntWritable k3, Iterable<Employee> v3,Context context)
throws IOException, InterruptedException {
//对v3求和,得到该部门的工资总额
int total = 0;
for(Employee v:v3){
total += v.getSal();
}
value.set(total);
//输出: 部门号 总额
context.write(k3, value);
}
}

Main

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package sudo.edu.hadoop.mapreduce.employee.partitioner;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class salaryTotalMain {
public static void main(String[] args) throws Exception {
//创建一个job
Job job = Job.getInstance(new Configuration());
job.setJarByClass(salaryTotalMain.class);
//指定job的mapper和输出的类型k2 v2
job.setMapperClass(salaryTotalMapper.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Employee.class);
// 指定任务的分区规则;指定建立几个分区
job.setPartitionerClass(salaryTotalPatitioner.class);
job.setNumReduceTasks(3);
//指定job的reducer和输出的类型 k4 v4
job.setReducerClass(salaryTotalReducer.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
//指定job的输入和输出的路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//执行任务
job.waitForCompletion(true);
}
}

结果

YntMjJ.md.png

YntKc4.md.png

合并

排序

又要排序又要序列化直接继承WritableComparable类; 这个接口继承自writable和comparable接口

Main

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package sudo.edu.hadoop.mapreduce.employee.sort;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class EmployeeSortMain {
public static void main(String[] args) throws Exception {
//创建一个 job
Job job = Job.getInstance(new Configuration());
job.setJarByClass(EmployeeSortMain.class);
//指定 job 的 mapper 和输出的类型 k2 v2
job.setMapperClass(EmployeeSortMapper.class);
job.setMapOutputKeyClass(Employee.class);
job.setMapOutputValueClass(NullWritable.class);
//指定 job 的输入和输出的路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//执行任务
job.waitForCompletion(true);
}
}

Mapper

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package sudo.edu.hadoop.mapreduce.employee.sort;

import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
//7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
public class EmployeeSortMapper extends Mapper<LongWritable, Text, Employee,
NullWritable> {
@Override
protected void map(LongWritable key1, Text value1, Context context)
throws IOException, InterruptedException {
//数据: 7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
String data = value1.toString();
//分词
String[] words = data.split(",");
//创建员工对象
Employee e = new Employee();
//设置员工的属性
//员工号
e.setEmpno(Integer.parseInt(words[0]));
//姓名
e.setEname(words[1]);
//职位
e.setJob(words[2]);
//老板号(注意:可能没有老板号)
try{
e.setMgr(Integer.parseInt(words[3]));
}catch(Exception ex){
//没有老板号
e.setMgr(-1);
}
//入职日期
e.setHiredate(words[4]);
//月薪
e.setSal(Integer.parseInt(words[5]));
//奖金(注意:奖金也可能没有)
try{
e.setComm(Integer.parseInt(words[6]));
}catch(Exception ex){
//没有奖金
e.setComm(0);
}
//部门号
e.setDeptno(Integer.parseInt(words[7]));
//输出
context.write(e, NullWritable.get());
}
}

Employee

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package sudo.edu.hadoop.mapreduce.employee.sort;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
//1.若要把 Employee 作为 key2,则需要实现序列化
//2.员工对象为 Employee 类,可被排序
//数据: 7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
public class Employee implements WritableComparable<Employee>{
private int empno;
private String ename;
private String job;
private int mgr;
private String hiredate;
private int sal;
private int comm;
private int deptno;
@Override
public String toString() {
return "Employee [empno=" + empno + ", ename=" + ename + ", sal=" + sal + ",
deptno=" + deptno + "]";
}
@Override
public int compareTo(Employee o) {
// 多个列的排序: select * from emp order by deptno,sal;
//首先按照 deptno 排序
if(this.deptno > o.getDeptno()){
return 1;
}else if(this.deptno < o.getDeptno()){
return -1;
}
//如果 deptno 相等,按照 sal 排序
if(this.sal >= o.getSal()){
return 1;
}else{
return -1;
}
}
@Override
public void readFields(DataInput input) throws IOException {
// 反序列化
this.empno = input.readInt();
this.ename = input.readUTF();
this.job = input.readUTF();
this.mgr = input.readInt();
this.hiredate = input.readUTF();
this.sal = input.readInt();
this.comm = input.readInt();
this.deptno = input.readInt();
}
@Override
public void write(DataOutput output) throws IOException {
// 序列化
output.writeInt(this.empno);
output.writeUTF(this.ename);
output.writeUTF(this.job);
output.writeInt(this.mgr);
output.writeUTF(this.hiredate);
output.writeInt(this.sal);
output.writeInt(this.comm);
output.writeInt(this.deptno);
}
public int getEmpno() {
return empno;
}
public void setEmpno(int empno) {
this.empno = empno;
}
public String getEname() {
return ename;
}
public void setEname(String ename) {
this.ename = ename;
}
public String getJob() {
return job;
}
public void setJob(String job) {
this.job = job;
}
public int getMgr() {
return mgr;
}
public void setMgr(int mgr) {
this.mgr = mgr;
}
public String getHiredate() {
return hiredate;
}
public void setHiredate(String hiredate) {
this.hiredate = hiredate;
}
public int getSal() {
return sal;
}
public void setSal(int sal) {
this.sal = sal;
}
public int getComm() {
return comm;
}
public void setComm(int comm) {
this.comm = comm;
}
public int getDeptno() {
return deptno;
}
public void setDeptno(int deptno) {
this.deptno = deptno;
}
}

MapReduce 编程案例

排序

1
查看员工的薪资,按部门、薪资升序排序。
1
2
3
4
5
但如果 key 属于某个自定义类,且期望 key 按某个方式进行排序,此时这个自定义类就要实
现 Java 中的 Comparable 的接口;另外,这个自定义类还需要实现 Hadoop 的 Writable 序列化接口
(更简单点,直接实现 WritableComparable 接口即可)。
可以把员工的属性封装为 Employee 类,并实现 WritableComparable 接口,按部门、薪资实现排序。
这次不需要用 Reducer。程序主方法需要用 job.setSortComparatorClass()指定比较器所在的类。

Mapper 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package sort.object; 
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
//7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
public class EmployeeSortMapper extends Mapper<LongWritable, Text, Employee,
NullWritable> {
@Override
protected void map(LongWritable key1, Text value1, Context context)
throws IOException, InterruptedException {
//数据:7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
String data = value1.toString();
//分词
String[] words = data.split(",");
//创建员工对象
Employee e = new Employee();
//设置员工的属性
//员工号
e.setEmpno(Integer.parseInt(words[0]));
//姓名
e.setEname(words[1]);
//职位
e.setJob(words[2]);
//老板号(注意:可能没有老板号)
try{
e.setMgr(Integer.parseInt(words[3]));
}catch(Exception ex){
//没有老板号
e.setMgr(-1);
}
//入职日期
e.setHiredate(words[4]);
//月薪
e.setSal(Integer.parseInt(words[5]));
//奖金(注意:奖金也可能没有)
try{
e.setComm(Integer.parseInt(words[6]));
}catch(Exception ex){
//没有奖金
e.setComm(0);
}
//部门号
e.setDeptno(Integer.parseInt(words[7]));
//输出
context.write(e, NullWritable.get());
}
}
1
get()方法是用来获取NullWritable的实例的。NullWritable是Writable的一个特殊类,它的实现方法为空实现,不从数据流中读数据,也不写入数据,只充当占位符。在MapReduce中,如果你不需要使用键或值,你就可以将键或值声明为NullWritable。它是一个不可变的单实例类型¹。

Employee类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package sort.object; 
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
//1.若要把 Employee 作为 key2,则需要实现序列化
//2.员工对象为 Employee 类,可被排序
//数据: 7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
public class Employee implements WritableComparable<Employee>{
private int empno;
private String ename;
private String job;
private int mgr;
private String hiredate;
private int sal;
private int comm;
private int deptno;
@Override
public String toString() {
return "Employee [empno=" + empno + ", ename=" + ename + ", sal=" + sal + ",
deptno=" + deptno + "]";
}
@Override
public int compareTo(Employee o) {
// 多个列的排序:select * from emp order by deptno,sal;
//首先按照 deptno 排序
if(this.deptno > o.getDeptno()){
return 1;
}else if(this.deptno < o.getDeptno()){
return -1;
}
//如果 deptno 相等,按照 sal 排序
if(this.sal >= o.getSal()){
return 1;
}else{
return -1;
}
}
@Override
public void readFields(DataInput input) throws IOException {
// 反序列化
this.empno = input.readInt();
this.ename = input.readUTF();
this.job = input.readUTF();
this.mgr = input.readInt();
this.hiredate = input.readUTF();
this.sal = input.readInt();
this.comm = input.readInt();
this.deptno = input.readInt();
}
@Override
public void write(DataOutput output) throws IOException {
// 序列化
output.writeInt(this.empno);
output.writeUTF(this.ename);
output.writeUTF(this.job);
output.writeInt(this.mgr);
output.writeUTF(this.hiredate);
output.writeInt(this.sal);
output.writeInt(this.comm);
output.writeInt(this.deptno);
}
public int getEmpno() {
return empno;
}
public void setEmpno(int empno) {
this.empno = empno;
}
public String getEname() {
return ename;
}
public void setEname(String ename) {
this.ename = ename;
}
public String getJob() {
return job;
}
public void setJob(String job) {
this.job = job;
}
public int getMgr() {
return mgr;
}
public void setMgr(int mgr) {
this.mgr = mgr;
}
public String getHiredate() {
return hiredate;
}
public void setHiredate(String hiredate) {
this.hiredate = hiredate;
}
public int getSal() {
return sal;
}
public void setSal(int sal) {
this.sal = sal;
}
public int getComm() {
return comm;
}
public void setComm(int comm) {
this.comm = comm;
}
public int getDeptno() {
return deptno;
}
public void setDeptno(int deptno) {
this.deptno = deptno;
}
}

程序主入口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package sort.object; 
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class EmployeeSortMain {
public static void main(String[] args) throws Exception {
//创建一个job
Job job = Job.getInstance(new Configuration());
job.setJarByClass(EmployeeSortMain.class);
//指定job的mapper和输出的类型k2 v2
job.setMapperClass(EmployeeSortMapper.class);
job.setMapOutputKeyClass(Employee.class);
job.setMapOutputValueClass(NullWritable.class);
//指定 job 的输入和输出的路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//执行任务
job.waitForCompletion(true);
}
}

提交作业到集群运行

1
hadoop jar objSort-1.0-SNAPSHOT.jar /input/emp.csv /output/sort
1
2
3
在HDFS文件中,这个程序的输出将以文本形式存储。每一行都代表一个键值对,其中键和值之间用制表符分隔。由于值是NullWritable的实例,所以它不会被写入输出文件。

具体来说,每一行都将包含一个Employee对象的字符串表示形式。这个字符串表示形式取决于Employee类中的`toString()`方法的实现。如果你想知道具体的输出格式,你需要查看`Employee`类中的toString()方法。

去重

1
2
获取员工表所有的 job 信息,并且要求仅列出不同的值。类似 SQL 语句:
select distinct job from EMP。
1
2
如果 k2 是 job,那么 Reducer 的输入 k3,就是去掉重复后的 job,而 v2 的类型用 NullWritable
即可。Reducer 直接把 k3 输出即可。

Mapper 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package distinct; 
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
// k1 v1 k2 v2
public class DistinctMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
@Override
protected void map(LongWritable key1, Text value1, Context context)
throws IOException, InterruptedException {
// 数据: 7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
String data = value1.toString();
//分词
String[] words = data.split(",");
//输出 k2 职位
context.write(new Text(words[2]), NullWritable.get());
}
}

Reducer 类

1
2
3
4
5
6
7
8
9
10
11
12
13
package distinct; 
import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class DistinctReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
@Override
protected void reduce(Text k3, Iterable<NullWritable> v3,Context context) throws
IOException, InterruptedException {
//直接输出
context.write(k3, NullWritable.get());
}
}

程序主入口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package distinct; 
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class DistinctMain {
public static void main(String[] args) throws Exception {
//创建一个 job 和任务入口
Job job = Job.getInstance(new Configuration());
job.setJarByClass(DistinctMain.class); //main 方法所在的 class
//指定 job 的 mapper 和输出的类型<k2 v2>
job.setMapperClass(DistinctMapper.class);
job.setMapOutputKeyClass(Text.class); //k2 的类型
job.setMapOutputValueClass(NullWritable.class); //v2 的类型
//指定 job 的 reducer 和输出的类型<k4 v4>
job.setReducerClass(DistinctReducer.class);
job.setOutputKeyClass(Text.class); //k4 的类型
job.setOutputValueClass(NullWritable.class); //v4 的类型
//指定 job 的输入和输出
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//执行 job
job.waitForCompletion(true);
}
}

提交作业到集群运行

1
hadoop jar distinct-1.0-SNAPSHOT.jar /input/emp.csv /output/distinct

多表查询

image-20230605163528277

1
2
采用MapReduce实现类似下面SQL语句的功能:
select d.dname,e.ename from emp e,dept d where e.deptno=d.deptno
1
2
3
(1)Map 端读取所有的文件,并为输出的内容加上标识,代表文件数据来源于员工表或部门
表。
(2)在 Reduce 端,按照标识对数据进行处理。

Mapper 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package equaljoin; 
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class EqualJoinMapper extends Mapper<LongWritable, Text, IntWritable, Text> {
@Override
protected void map(LongWritable key1, Text value1, Context context)
throws IOException, InterruptedException {
//得到数据
String data = value1.toString();
//分词
String[] words = data.split(",");
if(words.length == 8){
//处理的是员工表 部门号 员工姓名
context.write(new IntWritable(Integer.parseInt(words[7])), new
Text(words[1]));
}else{
//处理的是部门表 部门号 部门名称
context.write(new IntWritable(Integer.parseInt(words[0])), new
Text("*"+words[1]));
}
}
}

Reducer 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package equaljoin; 
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
// k4 部门名称 v4 所有员工的名字
public class EqualJoinReducer extends Reducer<IntWritable, Text, Text, Text> {
@Override
protected void reduce(IntWritable k3, Iterable<Text> v3, Context context) throws
IOException, InterruptedException {
//从 value3 中解析出 部门名称和员工姓名
String dname = ""; //部门名称
String empListName = ""; //所有员工的姓名

for(Text str:v3){
String name = str.toString();
//判断是否存储*号
int index = name.indexOf("*");
if(index >=0){
//是部门名称,去掉第一个*号
dname = name.substring(1);
}else{
//是员工的姓名
empListName = name+";"+empListName;
}
}
//输出
context.write(new Text(dname), new Text(empListName));
}
}

程序主入口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package equaljoin; 
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class EqualJoinMain {
public static void main(String[] args) throws Exception {
// 创建一个 job 和任务入口
Job job = Job.getInstance(new Configuration());
job.setJarByClass(EqualJoinMain.class); //main 方法所在的 class
//指定 job 的 mapper 和输出的类型<k2 v2>
job.setMapperClass(EqualJoinMapper.class);
job.setMapOutputKeyClass(IntWritable.class); //k2 的类型
job.setMapOutputValueClass(Text.class); //v2 的类型
//指定 job 的 reducer 和输出的类型<k4 v4>
job.setReducerClass(EqualJoinReducer.class);
job.setOutputKeyClass(Text.class); //k4 的类型
job.setOutputValueClass(Text.class); //v4 的类型
//指定 job 的输入和输出
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//执行 job
job.waitForCompletion(true);
}
}

提交作业到集群运行

1
hadoop jar equaljoin-1.0-SNAPSHOT.jar /inputjoin/ /output/join

使用 MapReduce 求出各年销售笔数、各年销售总额

1
2
3
4
5
6
7
8
字段名 类型 是否能为空 备注
PROD_ID int 否 产品 ID
CUST_ID int 否 客户 ID
TIME Date 否 日期
CHANNEL_ID int 否 渠道 ID
PROMO_ID int 否 促销 ID
QUANTITY_SOLD int 否 销售的数量(件)
AMOUNT_SOLD float(10,2) 否 销售的总额(元)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class SalesMapper extends Mapper<LongWritable, Text, IntWritable, FloatWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] fields = value.toString().split(",");
String date = fields[2];
int year = Integer.parseInt(date.substring(0, 4));
float amount = Float.parseFloat(fields[6]);
context.write(new IntWritable(year), new FloatWritable(amount));
}
}

public class SalesReducer extends Reducer<IntWritable, FloatWritable, IntWritable, Text> {
@Override
protected void reduce(IntWritable key, Iterable<FloatWritable> values, Context context) throws IOException, InterruptedException {
int count = 0;
float total = 0;
for (FloatWritable value : values) {
count++;
total += value.get();
}
String result = count + "," + total;
context.write(key, new Text(result));
}
}

使用 MapReduce 统计用户在搜狗上的搜索数据


人穷没入群,言轻莫劝人。

本文标题:hadoop实验课-mapreduce

文章作者:TTYONG

发布时间:2020年04月21日 - 09:04

最后更新:2023年07月04日 - 11:07

原始链接:http://tianyong.fun/hadoop%E5%A4%A7%E6%95%B0%E6%8D%AE%E6%8A%80%E6%9C%AF%E4%B8%8E%E5%BA%94%E7%94%A8-mapreduce(%E5%AE%9E%E9%AA%8C%E8%AF%BE).html

许可协议: 转载请保留原文链接及作者。

多少都是爱
0%