hadoop理论课-好友推荐


好友推荐

关系图

Y4YlJ1.png

数据处理

统计数据

以每个人为对象,统计直接关系

1
2
3
4
5
6
7
tom cat hello hadoop
cat tom hive
hive cat hadoop world hello mr
mr hive hello
world hadoop hive hello mr
hadoop hive tom world
hello world tom hive mr

处理数据

map阶段输出数据

以key-value的形式,表示两个人的直接或间接关系(0代表直接,1代表间接)

1
2
3
4
<'tom_cat': 0>, <'tom_hello': 0>, <'tom_hadoop': 0>, <'cat_hello': 1>, <'cat_hadoop': 1>, <'hello_hadoop': 1>
<'cat_tom': 0>, <'cat_hive': 0>, <'tom_hive': 1>
<'hive_cat': 0>, <'hive_hadoop': 0>, <'hive_world': 0>, <'hive_hello': 0>, <'hive_mr': 0>, <'cat_hadoop': 1>,<'cat_world': 1> ,<'cat_hello': 1> ,<'cat_mr': 1> ,<'hadoop_world': 1>, <'hadoop_hello': 1>, <'hadoop_mr': 1>, <'world_hello': 1>, <'world_mr': 1>, <'hello_mr': 1>,
....

通过Map函数形成两个人的关系,要注意升序还是降序,确保一致性

reduce阶段传入数据

key中两个人的顺序不同,记为同一个关系

1
2
<tom_cat:(0,1,0,0,0)>,<tom_hadoop:(1,1,1)>,<tom_hive:(1,1,1,1,1,1)>,<tom_world:(1,1,1,1,1,1)>,
<hadoop_world:(1,1,1,1)>,<hadoop_hello:(0,1,1)>
reduce阶段传出数据

传出间接关系拥有的共同好友

1
2
3
4
5
6
7
8
9
10
cat_hadoop	2
cat_hello 2
cat_mr 1
cat_world 1
hadoop_hello 3
hadoop_mr 1
hive_tom 3
mr_tom 1
mr_world 2
tom_world 2
代码
Mapper
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package sudo.edu.hadoop.mapreduce.recommendFriend;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class recommendFriendMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
Text key2 = new Text();
IntWritable value2 = new IntWritable();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// TODO Auto-generated method stub

String data = value.toString();
String list[] = data.split(" ");
for(int m = 0; m < list.length-1; m++)
{
for(int n = m + 1; n < list.length; n++)
{
if(m == 0) {
String outcome = linkStr(list[0], list[n]);
key2.set(outcome);
value2.set(0);
context.write(key2, value2);
}
else {
String outcome = linkStr(list[m], list[n]);
key2.set(outcome);
value2.set(1);
context.write(key2, value2);
}
}
}
}

protected static String linkStr(String a1, String a2)
{
if(a1.compareTo(a2) > 0) {
return a1+"_"+a2;
}
else {
return a2+"_"+a1;
}
}
}
reducer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package sudo.edu.hadoop.mapreduce.recommendFriend;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class recommendFriendReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
IntWritable v4 = new IntWritable();
@Override
protected void reduce(Text k3, Iterable<IntWritable> v3, Context context) throws IOException, InterruptedException {
// TODO Auto-generated method stub
int total = 0;
boolean flag = true;
for(IntWritable value:v3) {
if(value.get() == 1){
total += 1;
}
else {
flag = false;
break;
}
}
if(flag) {
v4.set(total);
context.write(k3, v4);
}
}
}
Main
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package sudo.edu.hadoop.mapreduce.recommendFriend;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import sudo.edu.hadoop.mapreduce.wordCount.wordCountMain;
import sudo.edu.hadoop.mapreduce.wordCount.wordCountMapper;
import sudo.edu.hadoop.mapreduce.wordCount.wordCountReducer;

public class recommendFriendMain {
public static void main(String args[]) throws Exception{
Job job = Job.getInstance(new Configuration());
job.setJarByClass(recommendFriendMain.class);
//
job.setMapperClass(recommendFriendMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//
job.setReducerClass(recommendFriendReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}
结果

tdPuiF.md.png

对XX_XX后续处理

不需要reduce阶段;对前阶段得到的数据xx_xx进行处理

Mapper
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package sudo.edu.hadoop.mapreduce.recommendFriend2;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class mapper extends Mapper<LongWritable, Text, RecommentResInfo, NullWritable>{
RecommentResInfo keySend = new RecommentResInfo();
@Override
protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException
{
// 数据
//xx_xx 5
String line = v1.toString();
String words[] = line.split("\t");
String personStr = words[0];
String[] ps = personStr.split("_");
String p1Str = ps[0];
String p2Str = ps[1];
int comNum = Integer.parseInt(words[1]);
keySend.setName(p1Str);
keySend.setRecommentName(p2Str);
keySend.setCommonNum(comNum);
context.write(keySend, NullWritable.get());
keySend.setName(p2Str);
keySend.setRecommentName(p1Str);
context.write(keySend, NullWritable.get());
}
}
RecommentResInfo(序列化,排序,人物类)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package sudo.edu.hadoop.mapreduce.recommendFriend2;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class mapper extends Mapper<LongWritable, Text, RecommentResInfo, NullWritable>{
RecommentResInfo keySend = new RecommentResInfo();
@Override
protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException
{
// 数据
//xx_xx 5
String line = v1.toString();
String words[] = line.split("\t");
String personStr = words[0];
String[] ps = personStr.split("_");
String p1Str = ps[0];
String p2Str = ps[1];
int comNum = Integer.parseInt(words[1]);
keySend.setName(p1Str);
keySend.setRecommentName(p2Str);
keySend.setCommonNum(comNum);
context.write(keySend, NullWritable.get());
keySend.setName(p2Str);
keySend.setRecommentName(p1Str);
context.write(keySend, NullWritable.get());
}
}
Main
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package sudo.edu.hadoop.mapreduce.recommendFriend2;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class mapper extends Mapper<LongWritable, Text, RecommentResInfo, NullWritable>{
RecommentResInfo keySend = new RecommentResInfo();
@Override
protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException
{
// 数据
//xx_xx 5
String line = v1.toString();
String words[] = line.split("\t");
String personStr = words[0];
String[] ps = personStr.split("_");
String p1Str = ps[0];
String p2Str = ps[1];
int comNum = Integer.parseInt(words[1]);
keySend.setName(p1Str);
keySend.setRecommentName(p2Str);
keySend.setCommonNum(comNum);
context.write(keySend, NullWritable.get());
keySend.setName(p2Str);
keySend.setRecommentName(p1Str);
context.write(keySend, NullWritable.get());
}
}
结果

tdPKG4.png


每个人心中都有一团火,不过路过的人只看到了烟。

本文标题:hadoop理论课-好友推荐

文章作者:TTYONG

发布时间:2020年05月19日 - 09:05

最后更新:2023年06月04日 - 15:06

原始链接:http://tianyong.fun/hadoop%E5%A4%A7%E6%95%B0%E6%8D%AE%E6%8A%80%E6%9C%AF%E4%B8%8E%E5%BA%94%E7%94%A8-%E5%A5%BD%E5%8F%8B%E6%8E%A8%E8%8D%90(%E5%AE%9E%E9%AA%8C%E8%AF%BE).html

许可协议: 转载请保留原文链接及作者。

多少都是爱
0%