一、 需求描述:
usr:friend,friend,friend...
---------------
A:B,C,D,F,E,O
B:A,C,E,K
C:F,A,D,I
D:A,E,F,L
E:B,C,D,M,L
F:A,B,C,D,E,O,M
G:A,C,D,E,F
H:A,C,D,E,O
I:A,O
J:B,O
K:A,C,D
L:D,E,F
M:E,F,G
O:A,H,I,J
class="p">最终结果:
A,B C,E
A,C D,F
A,D F,E
A,F B,C,D,E,O
B,E C
C,F A,D
D,E L
D,F A,E
D,L E,F
E,L D
F,M E
H,O A
I,O A
package com.friends.zb;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FilterFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* 找朋友
*
* @author zhangbing
*
*/
public class Friends {
public static class M1 extends Mapper<LongWritable , Text,Text,Text>{
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
String[] split = value.toString().split(":");
String[] split2 = split[1].split(",");
for(String s:split2){
context.write(new Text(s), new Text(split[0]));
}
}
}
public static class R1 extends Reducer<Text, Text, Text, Text>{
@Override
protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
List<String> list = new ArrayList<>();
for (Text t : values) {
list.add(t.toString());
}
Text k = new Text();
for (String s1 : list) {
for (String s2 : list) {
if(s1.compareTo(s2)<0){
k.set(s1+","+s2);
context.write(k, key);
}
}
String string = s1+","+key.toString();
if(s1.compareTo(key.toString())>0){
string=key.toString()+","+s1;
}
context.write(new Text(string), new Text("1"));
}
}
}
public static class M2 extends Mapper<LongWritable , Text,Text,Text>{
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
String[] split = value.toString().split("\t");
context.write(new Text(split[0]), new Text(split[1]));
}
}
public static class R2 extends Reducer<Text, Text, Text, Text>{
@Override
protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
int count = 0;
StringBuffer sb = new StringBuffer();
for (Text text : values) {
String s = text.toString();
if("1".equals(s)){
count++;
}else{
sb.append(",").append(s);
}
}
if(count == 2 && sb.length()>0){
context.write(key,new Text(sb.toString().substring(1)));
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(Friends.class);
job.setMapperClass(M1.class);
job.setReducerClass(R1.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job,new Path(args[0]));
Path path = new Path(args[1]);
FileSystem fileSystem = FilterFileSystem.get(conf);
if(fileSystem.exists(path)){
fileSystem.delete(path,true);
}
FileOutputFormat.setOutputPath(job, path);
boolean b = job.waitForCompletion(true);
if(b){
Job job2 = Job.getInstance(conf);
job2.setJarByClass(Friends.class);
job2.setMapperClass(M2.class);
job2.setReducerClass(R2.class);
job2.setOutputKeyClass(Text.class);
job2.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job2,new Path(args[1]));
Path path2 = new Path(args[2]);
FileSystem fileSystem2 = FilterFileSystem.get(conf);
if(fileSystem2.exists(path2)){
fileSystem2.delete(path2,true);
}
FileOutputFormat.setOutputPath(job2, path2);
job2.waitForCompletion(true);
}
}
}