mapreducer_.NET_编程开发_程序员俱乐部

中国优秀的程序员网站程序员频道CXYCLUB技术地图
热搜:
更多>>
 
您所在的位置: 程序员俱乐部 > 编程开发 > .NET > mapreducer

mapreducer

 2017/8/23 21:08:43  瞢甍瞢  程序员俱乐部  我要评论(0)
  • 摘要:一、需求描述:mapreduce笔试题:找出有共同好友的usersusr:friend,friend,friend...---------------A:B,C,D,F,E,OB:A,C,E,KC:F,A,D,ID:A,E,F,LE:B,C,D,M,LF:A,B,C,D,E,O,MG:A,C,D,E,FH:A,C,D,E,OI:A,OJ:B,OK:A,C,DL:D,E,FM:E,F,GO:A,H,I,J最终结果:A,BC,EA,CD,FA,DF,EA,FB,C,D,E,OB,ECC,FA,DD
  • 标签:Map

 

一、 需求描述:

mapreduce笔试题: 找出有共同好友的users

usr:friend,friend,friend...
---------------
A:B,C,D,F,E,O

B:A,C,E,K

C:F,A,D,I

D:A,E,F,L

E:B,C,D,M,L

F:A,B,C,D,E,O,M

G:A,C,D,E,F

H:A,C,D,E,O

I:A,O

J:B,O

K:A,C,D

L:D,E,F

M:E,F,G

O:A,H,I,J

class="p">最终结果:

A,B C,E

A,C D,F

A,D F,E

A,F B,C,D,E,O

B,E C

C,F A,D

D,E L

D,F A,E

D,L E,F

E,L D

F,M E

H,O A

I,O A

 

 

package com.friends.zb;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FilterFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
* 找朋友
*
* @author zhangbing
*
*/
public class Friends {

public static class M1 extends Mapper<LongWritable , Text,Text,Text>{
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
String[] split = value.toString().split(":");
String[] split2 = split[1].split(",");
for(String s:split2){
context.write(new Text(s), new Text(split[0]));
}
}
}
public static class R1 extends Reducer<Text, Text, Text, Text>{
@Override
protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {

List<String> list = new ArrayList<>();

for (Text t : values) {
list.add(t.toString());
}
Text k = new Text();
for (String s1 : list) {
for (String s2 : list) {
if(s1.compareTo(s2)<0){
k.set(s1+","+s2);
context.write(k, key);
}
}
String string = s1+","+key.toString();
if(s1.compareTo(key.toString())>0){
string=key.toString()+","+s1;
}
context.write(new Text(string), new Text("1"));
}
}
}

public static class M2 extends Mapper<LongWritable , Text,Text,Text>{
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
String[] split = value.toString().split("\t");
context.write(new Text(split[0]), new Text(split[1]));
}
}
public static class R2 extends Reducer<Text, Text, Text, Text>{
@Override
protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
int count = 0;
StringBuffer sb = new StringBuffer();
for (Text text : values) {

String s = text.toString();
if("1".equals(s)){
count++;
}else{
sb.append(",").append(s);
}
}
if(count == 2 && sb.length()>0){
context.write(key,new Text(sb.toString().substring(1)));
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);

job.setJarByClass(Friends.class);

job.setMapperClass(M1.class);

job.setReducerClass(R1.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job,new Path(args[0]));
Path path = new Path(args[1]);
FileSystem fileSystem = FilterFileSystem.get(conf);
if(fileSystem.exists(path)){
fileSystem.delete(path,true);
}
FileOutputFormat.setOutputPath(job, path);

boolean b = job.waitForCompletion(true);
if(b){
Job job2 = Job.getInstance(conf);

job2.setJarByClass(Friends.class);

job2.setMapperClass(M2.class);

job2.setReducerClass(R2.class);

job2.setOutputKeyClass(Text.class);
job2.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job2,new Path(args[1]));
Path path2 = new Path(args[2]);
FileSystem fileSystem2 = FilterFileSystem.get(conf);
if(fileSystem2.exists(path2)){
fileSystem2.delete(path2,true);
}
FileOutputFormat.setOutputPath(job2, path2);

job2.waitForCompletion(true);
}
}
}

发表评论
用户名: 匿名