最近一直在学习
hadoop 这是一个简单的关于MapReduce的示例
通过实现map和reduce2个函数完成操作
首先定义一个
自定义对象
class="java">class MyWriterble implements Writable{
long UpPackNum;
long DownPackNum;
long UpPayLoad;
long DownPayLoad;
public MyWriterble(){}
public MyWriterble(String UpPackNum,String DownPackNum,String UpPayLoad,String DownPayLoad){
this.UpPackNum=Long.parseLong(UpPackNum);
this.DownPackNum=Long.parseLong(DownPackNum);
this.UpPayLoad=Long.parseLong(UpPayLoad);
this.DownPayLoad=Long.parseLong(DownPayLoad);
}
@Override
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
this.UpPackNum=in.readLong();
this.DownPackNum=in.readLong();
this.UpPayLoad=in.readLong();
this.DownPayLoad=in.readLong();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(UpPackNum);
out.writeLong(DownPackNum);
out.writeLong(UpPayLoad);
out.writeLong(DownPayLoad);
}
@Override
public String toString() {
// TODO Auto-generated method stub
return UpPackNum + "\t" + DownPackNum + "\t" + UpPayLoad + "\t" + DownPayLoad;
}
}
其中类必须实现Writable
接口 和Java中的可
序列化接口一样
然后分别定义2个类 用于重写map和reduce方法
class MyMappera extends Mapper<LongWritable,Text, Text,MyWriterble>{
protected void map(LongWritable key, Text value,Context context) throws IOException ,InterruptedException {
String vals[]=value.toString().split("\t");
MyWriterble my=new MyWriterble(vals[21], vals[22], vals[23], vals[24]);
context.write(new Text(vals[2]),my);
};
}
Mapper中的
泛型 分别代表k1,v1,k2,v2类型
k1,v1即使分割每一行数据得到的键值对,key为位置,value为行内容
k2,v2为输出值得类型,k2表示每个手机号,所以为Text
class myreducers extends Reducer<Text,MyWriterble,Text,MyWriterble>{
protected void reduce(Text text, java.lang.Iterable<MyWriterble> wt,Context context) throws IOException ,InterruptedException {
long UpPackNum=0l;
long DownPackNum=0l;
long UpPayLoad=0l;
long DownPayLoad=0l;
for(MyWriterble mt:wt){
UpPackNum=UpPackNum+mt.UpPackNum;
DownPackNum=DownPackNum+mt.DownPackNum;
UpPayLoad=UpPayLoad+mt.UpPayLoad;
DownPayLoad=DownPayLoad+DownPayLoad;
}
context.write(text, new MyWriterble(UpPackNum+"", DownPackNum+"", UpPayLoad+"", DownPayLoad+""));
};
public class MyRIZHI {
public static void main(String[] args) throws Exception{
final String INPUT_PATHs = "hdfs://chaoren:9000/ncmdp_08500001_Net_20130515164000.dat";
final String OUT_PATHs = "hdfs://chaoren:9000/out";
Job job=new Job(new Configuration(),MyRIZHI.class.getSimpleName());
//1.1 指定输入文件路径
FileInputFormat.setInputPaths(job, INPUT_PATHs);
//指定哪个类用来格式化输入文件
job.setInputFormatClass(TextInputFormat.class);
//1.2指定自定义的Mapper类
job.setMapperClass(MyMappera.class);
//指定输出<k2,v2>的类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(MyWriterble.class);
//1.3 指定分区类
job.setPartitionerClass(HashPartitioner.class);
job.setNumReduceTasks(1);
//1.4 TODO 排序、分区
//1.5 TODO (可选)合并
//2.2 指定自定义的reduce类
job.setReducerClass(myreducers.class);
//指定输出<k3,v3>的类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(MyWriterble.class);
//2.3 指定输出到哪里
FileOutputFormat.setOutputPath(job, new Path(OUT_PATHs));
//设定输出文件的格式化类
job.setOutputFormatClass(TextOutputFormat.class);
//把代码提交给JobTracker执行
job.waitForCompletion(true);
}
}