博客详情

hadoop(四)--mapreduce流量统计案例 (原创)

作者: 朝如青丝暮成雪
发布时间:2018-06-23 10:24:57  文章分类:hadoop   阅读(986)  评论(0)
mapreduce是hadoop的核心,除了简单地处理诸如字符串、数字等数据,我们也可以处理自定义的bean(实现hadoop的序列化接口)
本篇,我们再写个mapreduce的案例:统计手机用户上网流量的数据。
现有一些手机用户上网行为的数据 如下:

HTTP_20130313143750.data


1363157985066   13726230503 00-FD-07-A4-72-B8:CMCC  120.196.100.82  i02.c.aliimg.com        24  27  2481    24681   200  
1363157995052   13826544101 5C-0E-8B-C7-F1-E0:CMCC  120.197.40.4            4   0   264 0   200  
1363157991076   13926435656 20-10-7A-28-CC-0A:CMCC  120.196.100.99          2   4   132 1512    200  
1363154400022   13926251106 5C-0E-8B-8B-B1-50:CMCC  120.197.40.4            4   0   240 0   200  
1363157993044   18211575961 94-71-AC-CD-E6-18:CMCC-EASY 120.196.100.99  iface.qiyi.com  视频网站    15  12  1527    2106    200  
1363157993055   13560439658 C4-17-FE-BA-DE-D9:CMCC  120.196.100.99          18  15  1116    954 200  
1363157995033   15920133257 5C-0E-8B-C7-BA-20:CMCC  120.197.40.4    sug.so.360.cn   信息安全    20  20  3156    2936    200  
1363157983019   13719199419 68-A1-B7-03-07-B1:CMCC-EASY 120.196.100.82          4   0   240 0   200  
1363157984041   13660577991 5C-0E-8B-92-5C-20:CMCC-EASY 120.197.40.4    s19.cnzz.com    站点统计    24  9   6960    690 200  
1363157973098   15013685858 5C-0E-8B-C7-F7-90:CMCC  120.197.40.4    rank.ie.sogou.com   搜索引擎    28  27  3659    3538    200  
1363157986029   15989002119 E8-99-C4-4E-93-E0:CMCC-EASY 120.196.100.99  www.umeng.com   站点统计    3   3   1938    180 200  
1363157992093   13560439658 C4-17-FE-BA-DE-D9:CMCC  120.196.100.99          15  9   918 4938    200  
1363157986041   13480253104 5C-0E-8B-C7-FC-80:CMCC-EASY 120.197.40.4            3   3   180 180 200  
1363157984040   13602846565 5C-0E-8B-8B-B6-00:CMCC  120.197.40.4    2052.flash2-http.qq.com 综合门户    15  12  1938    2910    200  
1363157995093   13922314466 00-FD-07-A2-EC-BA:CMCC  120.196.100.82  img.qfc.cn      12  12  3008    3720    200  
1363157982040   13502468823 5C-0A-5B-6A-0B-D4:CMCC-EASY 120.196.100.99  y0.ifengimg.com 综合门户    57  102 7335    110349  200  
1363157986072   18320173382 84-25-DB-4F-10-1A:CMCC-EASY 120.196.100.99  input.shouji.sogou.com  搜索引擎    21  18  9531    2412    200  
1363157990043   13925057413 00-1F-64-E1-E6-9A:CMCC  120.196.100.55  t3.baidu.com    搜索引擎    69  63  11058   48243   200  
1363157988072   13760778710 00-FD-07-A4-7B-08:CMCC  120.196.100.82          2   2   120 120 200  
1363157985079   13823070001 20-7C-8F-70-68-1F:CMCC  120.196.100.99          6   3   360 180 200  
1363157985069   13600217502 00-1F-64-E2-E8-B1:CMCC  120.196.100.55          18  138 1080    186852  200要  
将输入文件传到hadoop的hdfs集群中。

hadoop  fs -mkdir  -p /flow/input/
hadoop fs -put   HTTP_20130313143750.data   /flow/input/
案例1: 统计出每个手机号上网的上、下行流量及总流量(上行+下行)
我们需要从源数据中统计出每个用户(手机号)的所有请求的上行流量(第8列)、下行流量(第9列)、总流量的数据,并输出到单独的文件中。
我们可以将用户手机号、上行流量、下线流量、总流量封装的一个自定义的bean中,由于这个bean需要再hadoop中序列化传输,我们还需要让这个bean实现hadoop中的序列化接口。



Flow.java


package com.tingcream.hadoopStudy.flowSum;  
import java.io.DataInput;  
import java.io.DataOutput;  
import java.io.IOException;  
import org.apache.hadoop.io.Writable;  
  
/** 
 * Flow是我们自定的bean,要想在hadoop的map、reduce中传输数据,必须实现hadoop的序列化接口, 
 * 如 Writable 或 WritableComparable<T> 接口 
 *  
 * @author jelly 
 * 
 */  
public class Flow implements  Writable {  
      
   private String phone;//手机号  
   private long  up;//上行流量  
   private long down;//下线流量  
     
   private long sum;//总流量  
     
   //构造函数  
    public Flow() {  
    }  
      
    //构造函数  
    public Flow(String phone, long up, long down) {  
        super();  
        this.phone = phone;  
        this.up = up;  
        this.down = down;  
        this.sum=this.up+this.down;  
    }  
  
    @Override  
    public void write(DataOutput out) throws IOException {  
      
       out.writeUTF(this.phone);  
       out.writeLong(this.up);  
       out.writeLong(this.down);  
       out.writeLong(this.sum);  
    }  
  
    @Override  
    public void readFields(DataInput in) throws IOException {  
          
          
        this.phone=in.readUTF();  
        this.up=in.readLong();  
        this.down=in.readLong();  
        this.sum=in.readLong();  
   
    }  
  
      
      
    @Override  
    public String toString() {  
   
        return   this.up+"\t"+this.down+"\t"+this.sum;  
    }  
  
  
    public String getPhone() {  
        return phone;  
    }  
  
    public void setPhone(String phone) {  
        this.phone = phone;  
    }  
  
    public long getUp() {  
        return up;  
    }  
  
    public void setUp(long up) {  
        this.up = up;  
    }  
  
    public long getDown() {  
        return down;  
    }  
  
    public void setDown(long down) {  
        this.down = down;  
    }  
    public long getSum() {  
        return sum;  
    }  
      
} 

FlowSumMapper.java  map程序


package com.tingcream.hadoopStudy.flowSum;  
  
import java.io.IOException;  
  
import org.apache.hadoop.io.LongWritable;  
import org.apache.hadoop.io.Text;  
import org.apache.hadoop.mapreduce.Mapper;  
import org.apache.commons.lang.StringUtils;  
  
/** 
 * flowsum  map程序 
 *  
 * 
 * @author jelly 
 * 
 */  
public class FlowSumMapper extends Mapper<LongWritable, Text, Text, Flow>{  
  
    @Override  
    protected void map(LongWritable key, Text value,  
             Context context)  
            throws IOException, InterruptedException {  
                //拿一行数据  
                String line = value.toString();  
                //切分成各个字段  
                String[] fields = StringUtils.split(line, "\t");  
                  
                //拿到我们需要的字段  
                String phone = fields[1];  
                long  up= Long.parseLong(fields[7]);  
                long  down = Long.parseLong(fields[8]);  
                //封装数据为kv并输出        <phone:flow>  
                context.write(new Text(phone), new Flow(phone,up,down));  
       
    }  
  
}  

FlowSumReducer.java    reduce程序 


package com.tingcream.hadoopStudy.flowSum;  
  
import java.io.IOException;  
  
import org.apache.hadoop.io.Text;  
import org.apache.hadoop.mapreduce.Reducer;  
  
/** 
 * flowsum 中的reduce程序 
 * @author jelly 
 * 
 */  
public class FlowSumReducer extends Reducer<Text, Flow, Text, Flow> {  
  
    @Override  
    protected void reduce(Text key, Iterable<Flow> values,  
              Context context)  
            throws IOException, InterruptedException {  
        //  <phone:{flow,flow,flow,flow}>  
        // reduce中的业务逻辑就是遍历values,然后进行累加求和再输出  
        long up =0;//  
        long down =0;  
        for(Flow flow:values ){  
             up+= flow.getUp();  
             down+= flow.getDown();  
        }  
        context.write(key, new Flow(key.toString(),up,down));  
           
    }  
  
}  
程序入口类



package com.tingcream.hadoopStudy.flowSum;  
  
import org.apache.hadoop.conf.Configuration;  
import org.apache.hadoop.conf.Configured;  
import org.apache.hadoop.fs.FileSystem;  
import org.apache.hadoop.fs.Path;  
import org.apache.hadoop.io.Text;  
import org.apache.hadoop.mapreduce.Job;  
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
import org.apache.hadoop.util.Tool;  
import org.apache.hadoop.util.ToolRunner;  
  
/** 
 * runner  入口类 
 * @author jelly 
 */  
public class FlowSumRunner extends Configured implements Tool{  
  
  
    @Override  
    public int run(String[] args) throws Exception {  
          
        Configuration conf = new Configuration();     
          
        Job job = Job.getInstance(conf);  
          
        job.setJarByClass(FlowSumRunner.class);  
          
        job.setMapperClass(FlowSumMapper.class);  
        job.setReducerClass(FlowSumReducer.class);  
          
        //设置map程序的输出key、value  
        job.setMapOutputKeyClass(Text.class);  
        job.setMapOutputValueClass(Flow.class);  
          
        //设置   输出 key、value  
        job.setOutputKeyClass(Text.class);  
        job.setOutputValueClass(Flow.class);  
          
        FileInputFormat.setInputPaths(job, new Path(args[0]));//输入数据路径     /flow/input  
  
        //检查一下参数所指定的输出路径是否存在,如果已存在,先删除  
        Path output = new Path(args[1]);  
        FileSystem fs = FileSystem.get(conf);  
        if(fs.exists(output)){  
            fs.delete(output, true);  
        }  
          
        FileOutputFormat.setOutputPath(job, new Path(args[1]));//输出数据路径   /flow/output  
          
        return job.waitForCompletion(true)?0:1;  
    }  
      
      
    public static void main(String[] args) throws Exception {  
        int  status = ToolRunner.run(new Configuration(), new FlowSumRunner(), args);  
        System.exit(status);  
    }  
}  

提交jar包到hadoop(yarn集群)中运行.
hadoop jar flow.jar  com.tingcream.hadoopStudy.flowSum.FlowSumRunner  /flow/input/ /flow/output/
运行结果如下:
hadoop fs -cat /flow/output/part-r-00000

13480253104     720     800     1520  
13502468823     408     29340   29748  
13560439658     23568   1600    25168  
13600217502     747408  800     748208  
13602846565     48      7752    7800  
13660577991     36      27840   27876  
13719199419     0       800     800  
13726230503     9924    98724   108648  
13760778710     480     800     1280  
13823070001     720     800     1520  
13826544101     0       800     800  
13922314466     12032   14880   26912  
13925057413     252     44232   44484  
13926251106     56      6908    6964  
13926435656     6048    800     6848  
15013685858     108     14636   14744  
15920133257     80      12624   12704  
15989002119     12      7752    7764  
18320173382     72      38124   38196 


ok ,上面的输出结果无疑是正确的,但是我们同时也发现了一个现象: 输出的结果是按照手机号(key)来正序排列的,如果我们先让输出结果按总流量倒序排列该如果处理呢?
我们可对上面的输出结果再做一次mapreduce运算:
将整个flow 对象作为key中进行map、reduce处理(此时value为null),并且flow对象可重写比较方法按照自定义的规则进行比较。

案例2: 按总流量倒排
Flow.java


package com.tingcream.hadoopStudy.flowSort;  
  
import java.io.DataInput;  
import java.io.DataOutput;  
import java.io.IOException;  
  
import org.apache.hadoop.io.WritableComparable;  
  
  
/** 
 * Flow是我们自定的bean,要想在hadoop的map、reduce中传输数据,必须实现hadoop的序列化接口, 
 * 如 Writable 或 WritableComparable<T> 接口 
 *  
 * @author jelly 
 * 
 */  
public class Flow implements  WritableComparable<Flow> {  
      
   private String phone;//手机号  
   private long  up;//上行流量  
   private long down;//下线流量  
     
   private long sum;//总流量  
     
   //构造函数  
    public Flow() {  
    }  
      
    //构造函数  
    public Flow(String phone, long up, long down) {  
        super();  
        this.phone = phone;  
        this.up = up;  
        this.down = down;  
        this.sum=this.up+this.down;  
    }  
  
    @Override  
    public void write(DataOutput out) throws IOException {  
      
       out.writeUTF(this.phone);  
       out.writeLong(this.up);  
       out.writeLong(this.down);  
       out.writeLong(this.sum);  
    }  
  
    @Override  
    public void readFields(DataInput in) throws IOException {  
          
          
        this.phone=in.readUTF();  
        this.up=in.readLong();  
        this.down=in.readLong();  
        this.sum=in.readLong();  
   
    }  
  
      
      
    @Override  
    public String toString() {  
   
        return   this.phone+"\t"+this.up+"\t"+this.down+"\t"+this.sum;  
    }  
  
  
    public String getPhone() {  
        return phone;  
    }  
  
    public void setPhone(String phone) {  
        this.phone = phone;  
    }  
  
    public long getUp() {  
        return up;  
    }  
  
    public void setUp(long up) {  
        this.up = up;  
    }  
  
    public long getDown() {  
        return down;  
    }  
  
    public void setDown(long down) {  
        this.down = down;  
    }  
  
    public long getSum() {  
        return sum;  
    }  
  
    @Override  
    public int compareTo(Flow o) {  
         //bean 按总流量倒序排列  
        return this.getSum()>o.getSum()?-1:1;  
          
    }  

}  


map程序


package com.tingcream.hadoopStudy.flowSort;  
 
import java.io.IOException;  
import org.apache.hadoop.io.LongWritable;  
import org.apache.hadoop.io.NullWritable;  
import org.apache.hadoop.io.Text;  
import org.apache.hadoop.mapreduce.Mapper;  
import org.apache.commons.lang.StringUtils;  
  
/** 
 * flowsum  map程序 
 *  
 * 
 * @author jelly 
 * 
 */  
public class FlowSortMapper extends Mapper<LongWritable, Text, Flow, NullWritable>{  
  
    @Override  
    protected void map(LongWritable key, Text value,  
             Context context)  
            throws IOException, InterruptedException {  
                //拿一行数据  
                String line = value.toString();  
                //切分成各个字段  
                String[] fields = StringUtils.split(line, "\t");  
                  
                //拿到我们需要的字段  
                String phone = fields[0];  
                long  up= Long.parseLong(fields[1]);  
                long  down = Long.parseLong(fields[2]);  
                //封装数据为kv并输出    <flow:null >  
                context.write(new Flow(phone,up,down),  NullWritable.get());  
       
    }  
}  
reduce程序



package com.tingcream.hadoopStudy.flowSort;  
  
import java.io.IOException;  
  
import org.apache.hadoop.io.NullWritable;  
import org.apache.hadoop.mapreduce.Reducer;  
  
/** 
 * flowsum 中的reduce程序 
 * @author jelly 
 * 
 */  
public class FlowSortReducer extends Reducer<Flow, NullWritable, Flow, NullWritable> {  
  
    @Override  
    protected void reduce(Flow key, Iterable<NullWritable> values,  
              Context context)  
            throws IOException, InterruptedException {  
       
        //  <flow:null>    
           //输出 k-v  
         context.write(key, NullWritable.get());  
          
    }  
}  
程序入口类



package com.tingcream.hadoopStudy.flowSort;  
  
  
import org.apache.hadoop.conf.Configuration;  
import org.apache.hadoop.conf.Configured;  
import org.apache.hadoop.fs.FileSystem;  
import org.apache.hadoop.fs.Path;  
import org.apache.hadoop.io.NullWritable;  
import org.apache.hadoop.mapreduce.Job;  
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
import org.apache.hadoop.util.Tool;  
import org.apache.hadoop.util.ToolRunner;  
  
/** 
 * runner  入口类 
 * @author jelly 
 */  
public class FlowSortRunner extends Configured implements Tool{  
  
  
    @Override  
    public int run(String[] args) throws Exception {  
          
        Configuration conf = new Configuration();     
          
        Job job = Job.getInstance(conf);  
          
        job.setJarByClass(FlowSortRunner.class);  
          
        job.setMapperClass(FlowSortMapper.class);  
        job.setReducerClass(FlowSortReducer.class);  
          
        //设置map程序的输出key、value  
        //job.setMapOutputKeyClass(Text.class);  
        //job.setMapOutputValueClass(Flow.class);  
          
        //设置   输出 key、value  
        job.setOutputKeyClass(Flow.class);  
        job.setOutputValueClass(NullWritable.class);  
          
        FileInputFormat.setInputPaths(job, new Path(args[0]));//输入数据路径       
  
        //检查一下参数所指定的输出路径是否存在,如果已存在,先删除  
        Path output = new Path(args[1]);  
        FileSystem fs = FileSystem.get(conf);  
        if(fs.exists(output)){  
            fs.delete(output, true);  
        }  
          
        FileOutputFormat.setOutputPath(job, new Path(args[1]));//输出数据路径     
          
        return job.waitForCompletion(true)?0:1;  
    }  
      
      
    public static void main(String[] args) throws Exception {  
        int  status = ToolRunner.run(new Configuration(), new FlowSortRunner(), args);  
        System.exit(status);  
    }  
}  
再次提交jar包到hadoop集群中运行
hadoop jar flow.jar com.tingcream.hadoopStudy.flowSort.FlowSortRunner  /flow/output/  /flow/output2/  

输出结果如下:
hadoop fs -cat /flow/output2/part-r-00000
13600217502     186852  200     187052  
13726230503     2481    24681   27162  
13925057413     63      11058   11121  
18320173382     18      9531    9549  
13502468823     102     7335    7437  
13660577991     9       6960    6969  
13922314466     3008    3720    6728  
13560439658     5892    400     6292  
15013685858     27      3659    3686  
15920133257     20      3156    3176  
13602846565     12      1938    1950  
15989002119     3       1938    1941  
13926251106     14      1727    1741  
13926435656     1512    200     1712  
13823070001     180     200     380  
13480253104     180     200     380  
13760778710     120     200     320  
13826544101     0       200     200  
13719199419     0       200     200 
ok , 现在的结果是按照总流量倒序排序显示的了,运行结果正确!!

假如源数据太多,数据量庞大,最好是按照各个区域(手机号归属省份)来分别统计输出,这时我们需要多个reduce任务并发运行了。并且由于map的结果需要分区处理(默认只有一个分区),我们需要手动指定下分区的规则。

案例3: 分区显示



AreaPartitioner.java  自定义分区规则


package com.tingcream.hadoopStudy.flowArea;  
  
import java.util.HashMap;  
  
import org.apache.hadoop.mapreduce.Partitioner;  
  
public class AreaPartitioner<KEY, VALUE> extends Partitioner<KEY, VALUE> {  
      
private static HashMap<String,Integer> areaMap = new HashMap<String,Integer>();  
      
    static{  
        areaMap.put("135", 0);  
        areaMap.put("136", 1);  
        areaMap.put("137", 2);  
        areaMap.put("138", 3);  
        areaMap.put("139", 4);  
       
        //0 、1 、2 、3 、4 、5  总共6个区  
          
    }  
  
    @Override  
    public int getPartition(KEY key, VALUE value, int numPartitions) {  
         
          // areaMap.get(key.toString().substring(0, 3));     
         String phonePrefix = key.toString().substring(0, 3);  
          return  areaMap.get(phonePrefix)==null?5:areaMap.get(phonePrefix);  
             
    }  
  
}  
Flow.java



package com.tingcream.hadoopStudy.flowArea;  
  
import java.io.DataInput;  
import java.io.DataOutput;  
import java.io.IOException;  
  
import org.apache.hadoop.io.Writable;  
  
  
/** 
 * Flow是我们自定的bean,要想在hadoop的map、reduce中传输数据,必须实现hadoop的序列化接口, 
 * 如 Writable 或 WritableComparable<T> 接口 
 *  
 * @author jelly 
 * 
 */  
public class Flow implements  Writable {  
      
   private String phone;//手机号  
   private long  up;//上行流量  
   private long down;//下线流量  
     
   private long sum;//总流量  
     
   //构造函数  
    public Flow() {  
    }  
      
    //构造函数  
    public Flow(String phone, long up, long down) {  
        super();  
        this.phone = phone;  
        this.up = up;  
        this.down = down;  
        this.sum=this.up+this.down;  
    }  
  
    @Override  
    public void write(DataOutput out) throws IOException {  
      
       out.writeUTF(this.phone);  
       out.writeLong(this.up);  
       out.writeLong(this.down);  
       out.writeLong(this.sum);  
    }  
  
    @Override  
    public void readFields(DataInput in) throws IOException {  
          
          
        this.phone=in.readUTF();  
        this.up=in.readLong();  
        this.down=in.readLong();  
        this.sum=in.readLong();  
   
    }  
  
      
      
    @Override  
    public String toString() {  
   
        return   this.up+"\t"+this.down+"\t"+this.sum;  
    }  
  
  
    public String getPhone() {  
        return phone;  
    }  
  
    public void setPhone(String phone) {  
        this.phone = phone;  
    }  
  
    public long getUp() {  
        return up;  
    }  
  
    public void setUp(long up) {  
        this.up = up;  
    }  
  
    public long getDown() {  
        return down;  
    }  
  
    public void setDown(long down) {  
        this.down = down;  
    }  
    public long getSum() {  
        return sum;  
    }  

}  
map程序



package com.tingcream.hadoopStudy.flowArea;  
  
import java.io.IOException;  
  
import org.apache.hadoop.io.LongWritable;  
import org.apache.hadoop.io.Text;  
import org.apache.hadoop.mapreduce.Mapper;  
import org.apache.commons.lang.StringUtils;  
  
/** 
 * flowsum  map程序 
 *  
 * 
 * @author jelly 
 * 
 */  
public class FlowSumAreaMapper extends Mapper<LongWritable, Text, Text, Flow>{  
  
    @Override  
    protected void map(LongWritable key, Text value,  
             Context context)  
            throws IOException, InterruptedException {  
                //拿一行数据  
                String line = value.toString();  
                //切分成各个字段  
                String[] fields = StringUtils.split(line, "\t");  
                  
                //拿到我们需要的字段  
                String phone = fields[1];  
                long  up= Long.parseLong(fields[7]);  
                long  down = Long.parseLong(fields[8]);  
                //封装数据为kv并输出        <phone:flow>  
                context.write(new Text(phone), new Flow(phone,up,down));  
       
    }  
  
}  
reduce程序



package com.tingcream.hadoopStudy.flowArea;  
  
import java.io.IOException;  
  
import org.apache.hadoop.io.Text;  
import org.apache.hadoop.mapreduce.Reducer;  
  
/** 
 * flowsum 中的reduce程序 
 * @author jelly 
 * 
 */  
public class FlowSumAreaReducer extends Reducer<Text, Flow, Text, Flow> {  
  
    @Override  
    protected void reduce(Text key, Iterable<Flow> values,  
              Context context)  
            throws IOException, InterruptedException {  
        //  <phone:{flow,flow,flow,flow}>  
        // reduce中的业务逻辑就是遍历values,然后进行累加求和再输出  
        long up =0;//  
        long down =0;  
        for(Flow flow:values ){  
             up+= flow.getUp();  
             down+= flow.getDown();  
        }  
        context.write(key, new Flow(key.toString(),up,down));  
           
    }  
  
}  
入口程序



package com.tingcream.hadoopStudy.flowArea;  
  
  
import org.apache.hadoop.conf.Configuration;  
import org.apache.hadoop.conf.Configured;  
import org.apache.hadoop.fs.FileSystem;  
import org.apache.hadoop.fs.Path;  
import org.apache.hadoop.io.Text;  
import org.apache.hadoop.mapreduce.Job;  
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
import org.apache.hadoop.util.Tool;  
import org.apache.hadoop.util.ToolRunner;  
  
/** 
 * runner  入口类 
 * @author jelly 
 */  
public class FlowSumAreaRunner extends Configured implements Tool{  
  
  
    @Override  
    public int run(String[] args) throws Exception {  
          
        Configuration conf = new Configuration();     
          
        Job job = Job.getInstance(conf);  
          
        job.setJarByClass(FlowSumAreaRunner.class);  
          
        job.setMapperClass(FlowSumAreaMapper.class);  
        job.setReducerClass(FlowSumAreaReducer.class);  
          
        //设置map程序的输出key、value  
        job.setMapOutputKeyClass(Text.class);  
        job.setMapOutputValueClass(Flow.class);  
          
        //设置   输出 key、value  
        job.setOutputKeyClass(Text.class);  
        job.setOutputValueClass(Flow.class);  
          
          
        //设置我们自定义的分组逻辑定义  
        job.setPartitionerClass(AreaPartitioner.class);  
        //设置reduce的任务并发数,应该跟分组的数量保持一致  
        job.setNumReduceTasks(6); //6 或者1个  
          
          
        FileInputFormat.setInputPaths(job, new Path(args[0]));//输入数据路径     /flow/input  
  
        //检查一下参数所指定的输出路径是否存在,如果已存在,先删除  
        Path output = new Path(args[1]);  
        FileSystem fs = FileSystem.get(conf);  
        if(fs.exists(output)){  
            fs.delete(output, true);  
        }  
          
        FileOutputFormat.setOutputPath(job, new Path(args[1]));//输出数据路径   /flow/output3  
          
        return job.waitForCompletion(true)?0:1;  
    }  
      
      
    public static void main(String[] args) throws Exception {  
        int  status = ToolRunner.run(new Configuration(), new FlowSumAreaRunner(), args);  
        System.exit(status);  
    }  
}  
hadoop jar flow.jar com.tingcream.hadoopStudy.flowArea.FlowSumAreaRunner  /flow/input/ /flow/areaoutput/ 
输出结果有6个文件,分别对应6个reduce程序的输出(part-r-0000 到part-r-0005)


[hadoop@hadoopNode01 ~]$ hadoop fs -ls /flow/areaoutput/ 
Found 7 items
-rw-r--r--   2 hadoop supergroup          0 2018-06-06 03:48 /flow/areaoutput/_SUCCESS
-rw-r--r--   2 hadoop supergroup         52 2018-06-06 03:48 /flow/areaoutput/part-r-00000
-rw-r--r--   2 hadoop supergroup         79 2018-06-06 03:48 /flow/areaoutput/part-r-00001
-rw-r--r--   2 hadoop supergroup         75 2018-06-06 03:48 /flow/areaoutput/part-r-00002
-rw-r--r--   2 hadoop supergroup         46 2018-06-06 03:48 /flow/areaoutput/part-r-00003
-rw-r--r--   2 hadoop supergroup        105 2018-06-06 03:48 /flow/areaoutput/part-r-00004
-rw-r--r--   2 hadoop supergroup        123 2018-06-06 03:48 /flow/areaoutput/part-r-00005

查看每个输出文件发现,part-r-00000 中统计的是所有135开头的手机号的数据,part-r-00001中统计的是所有136开头的手机号的数据... 而part-r-00005中统计的是非135,136,137,138,139开头手机号的其他手机号的数据。

ok ! 







关键字:  hadoop  mapreduce  大数据
评论信息
暂无评论
发表评论

亲,您还没有登陆,暂不能评论哦! 去 登陆 | 注册

博主信息
   
数据加载中,请稍候...
文章分类
   
数据加载中,请稍候...
阅读排行
 
数据加载中,请稍候...
评论排行
 
数据加载中,请稍候...

Copyright © 叮叮声的奶酪 版权所有
备案号:鄂ICP备17018671号-1

鄂公网安备 42011102000739号