HTTP_20130313143750.data
1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200 1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200 1363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200 1363154400022 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 4 0 240 0 200 1363157993044 18211575961 94-71-AC-CD-E6-18:CMCC-EASY 120.196.100.99 iface.qiyi.com 视频网站 15 12 1527 2106 200 1363157993055 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200 1363157995033 15920133257 5C-0E-8B-C7-BA-20:CMCC 120.197.40.4 sug.so.360.cn 信息安全 20 20 3156 2936 200 1363157983019 13719199419 68-A1-B7-03-07-B1:CMCC-EASY 120.196.100.82 4 0 240 0 200 1363157984041 13660577991 5C-0E-8B-92-5C-20:CMCC-EASY 120.197.40.4 s19.cnzz.com 站点统计 24 9 6960 690 200 1363157973098 15013685858 5C-0E-8B-C7-F7-90:CMCC 120.197.40.4 rank.ie.sogou.com 搜索引擎 28 27 3659 3538 200 1363157986029 15989002119 E8-99-C4-4E-93-E0:CMCC-EASY 120.196.100.99 www.umeng.com 站点统计 3 3 1938 180 200 1363157992093 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 15 9 918 4938 200 1363157986041 13480253104 5C-0E-8B-C7-FC-80:CMCC-EASY 120.197.40.4 3 3 180 180 200 1363157984040 13602846565 5C-0E-8B-8B-B6-00:CMCC 120.197.40.4 2052.flash2-http.qq.com 综合门户 15 12 1938 2910 200 1363157995093 13922314466 00-FD-07-A2-EC-BA:CMCC 120.196.100.82 img.qfc.cn 12 12 3008 3720 200 1363157982040 13502468823 5C-0A-5B-6A-0B-D4:CMCC-EASY 120.196.100.99 y0.ifengimg.com 综合门户 57 102 7335 110349 200 1363157986072 18320173382 84-25-DB-4F-10-1A:CMCC-EASY 120.196.100.99 input.shouji.sogou.com 搜索引擎 21 18 9531 2412 200 1363157990043 13925057413 00-1F-64-E1-E6-9A:CMCC 120.196.100.55 t3.baidu.com 搜索引擎 69 63 11058 48243 200 1363157988072 13760778710 00-FD-07-A4-7B-08:CMCC 120.196.100.82 2 2 120 120 200 1363157985079 13823070001 20-7C-8F-70-68-1F:CMCC 120.196.100.99 6 3 360 180 200 1363157985069 13600217502 00-1F-64-E2-E8-B1:CMCC 120.196.100.55 18 138 1080 186852 200要将输入文件传到hadoop的hdfs集群中。
Flow.java
package com.tingcream.hadoopStudy.flowSum; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; /** * Flow是我们自定的bean,要想在hadoop的map、reduce中传输数据,必须实现hadoop的序列化接口, * 如 Writable 或 WritableComparable<T> 接口 * * @author jelly * */ public class Flow implements Writable { private String phone;//手机号 private long up;//上行流量 private long down;//下线流量 private long sum;//总流量 //构造函数 public Flow() { } //构造函数 public Flow(String phone, long up, long down) { super(); this.phone = phone; this.up = up; this.down = down; this.sum=this.up+this.down; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(this.phone); out.writeLong(this.up); out.writeLong(this.down); out.writeLong(this.sum); } @Override public void readFields(DataInput in) throws IOException { this.phone=in.readUTF(); this.up=in.readLong(); this.down=in.readLong(); this.sum=in.readLong(); } @Override public String toString() { return this.up+"\t"+this.down+"\t"+this.sum; } public String getPhone() { return phone; } public void setPhone(String phone) { this.phone = phone; } public long getUp() { return up; } public void setUp(long up) { this.up = up; } public long getDown() { return down; } public void setDown(long down) { this.down = down; } public long getSum() { return sum; } }
FlowSumMapper.java map程序
package com.tingcream.hadoopStudy.flowSum; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.commons.lang.StringUtils; /** * flowsum map程序 * * * @author jelly * */ public class FlowSumMapper extends Mapper<LongWritable, Text, Text, Flow>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //拿一行数据 String line = value.toString(); //切分成各个字段 String[] fields = StringUtils.split(line, "\t"); //拿到我们需要的字段 String phone = fields[1]; long up= Long.parseLong(fields[7]); long down = Long.parseLong(fields[8]); //封装数据为kv并输出 <phone:flow> context.write(new Text(phone), new Flow(phone,up,down)); } }
FlowSumReducer.java reduce程序
package com.tingcream.hadoopStudy.flowSum; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; /** * flowsum 中的reduce程序 * @author jelly * */ public class FlowSumReducer extends Reducer<Text, Flow, Text, Flow> { @Override protected void reduce(Text key, Iterable<Flow> values, Context context) throws IOException, InterruptedException { // <phone:{flow,flow,flow,flow}> // reduce中的业务逻辑就是遍历values,然后进行累加求和再输出 long up =0;// long down =0; for(Flow flow:values ){ up+= flow.getUp(); down+= flow.getDown(); } context.write(key, new Flow(key.toString(),up,down)); } }程序入口类
package com.tingcream.hadoopStudy.flowSum; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * runner 入口类 * @author jelly */ public class FlowSumRunner extends Configured implements Tool{ @Override public int run(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(FlowSumRunner.class); job.setMapperClass(FlowSumMapper.class); job.setReducerClass(FlowSumReducer.class); //设置map程序的输出key、value job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Flow.class); //设置 输出 key、value job.setOutputKeyClass(Text.class); job.setOutputValueClass(Flow.class); FileInputFormat.setInputPaths(job, new Path(args[0]));//输入数据路径 /flow/input //检查一下参数所指定的输出路径是否存在,如果已存在,先删除 Path output = new Path(args[1]); FileSystem fs = FileSystem.get(conf); if(fs.exists(output)){ fs.delete(output, true); } FileOutputFormat.setOutputPath(job, new Path(args[1]));//输出数据路径 /flow/output return job.waitForCompletion(true)?0:1; } public static void main(String[] args) throws Exception { int status = ToolRunner.run(new Configuration(), new FlowSumRunner(), args); System.exit(status); } }
提交jar包到hadoop(yarn集群)中运行.
hadoop jar flow.jar com.tingcream.hadoopStudy.flowSum.FlowSumRunner /flow/input/ /flow/output/
运行结果如下:
hadoop fs -cat /flow/output/part-r-00000
13480253104 720 800 1520 13502468823 408 29340 29748 13560439658 23568 1600 25168 13600217502 747408 800 748208 13602846565 48 7752 7800 13660577991 36 27840 27876 13719199419 0 800 800 13726230503 9924 98724 108648 13760778710 480 800 1280 13823070001 720 800 1520 13826544101 0 800 800 13922314466 12032 14880 26912 13925057413 252 44232 44484 13926251106 56 6908 6964 13926435656 6048 800 6848 15013685858 108 14636 14744 15920133257 80 12624 12704 15989002119 12 7752 7764 18320173382 72 38124 38196
ok ,上面的输出结果无疑是正确的,但是我们同时也发现了一个现象: 输出的结果是按照手机号(key)来正序排列的,如果我们先让输出结果按总流量倒序排列该如果处理呢?
我们可对上面的输出结果再做一次mapreduce运算:
将整个flow 对象作为key中进行map、reduce处理(此时value为null),并且flow对象可重写比较方法按照自定义的规则进行比较。
案例2: 按总流量倒排
Flow.java
package com.tingcream.hadoopStudy.flowSort; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; /** * Flow是我们自定的bean,要想在hadoop的map、reduce中传输数据,必须实现hadoop的序列化接口, * 如 Writable 或 WritableComparable<T> 接口 * * @author jelly * */ public class Flow implements WritableComparable<Flow> { private String phone;//手机号 private long up;//上行流量 private long down;//下线流量 private long sum;//总流量 //构造函数 public Flow() { } //构造函数 public Flow(String phone, long up, long down) { super(); this.phone = phone; this.up = up; this.down = down; this.sum=this.up+this.down; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(this.phone); out.writeLong(this.up); out.writeLong(this.down); out.writeLong(this.sum); } @Override public void readFields(DataInput in) throws IOException { this.phone=in.readUTF(); this.up=in.readLong(); this.down=in.readLong(); this.sum=in.readLong(); } @Override public String toString() { return this.phone+"\t"+this.up+"\t"+this.down+"\t"+this.sum; } public String getPhone() { return phone; } public void setPhone(String phone) { this.phone = phone; } public long getUp() { return up; } public void setUp(long up) { this.up = up; } public long getDown() { return down; } public void setDown(long down) { this.down = down; } public long getSum() { return sum; } @Override public int compareTo(Flow o) { //bean 按总流量倒序排列 return this.getSum()>o.getSum()?-1:1; } }
map程序
package com.tingcream.hadoopStudy.flowSort; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.commons.lang.StringUtils; /** * flowsum map程序 * * * @author jelly * */ public class FlowSortMapper extends Mapper<LongWritable, Text, Flow, NullWritable>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //拿一行数据 String line = value.toString(); //切分成各个字段 String[] fields = StringUtils.split(line, "\t"); //拿到我们需要的字段 String phone = fields[0]; long up= Long.parseLong(fields[1]); long down = Long.parseLong(fields[2]); //封装数据为kv并输出 <flow:null > context.write(new Flow(phone,up,down), NullWritable.get()); } }reduce程序
package com.tingcream.hadoopStudy.flowSort; import java.io.IOException; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Reducer; /** * flowsum 中的reduce程序 * @author jelly * */ public class FlowSortReducer extends Reducer<Flow, NullWritable, Flow, NullWritable> { @Override protected void reduce(Flow key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { // <flow:null> //输出 k-v context.write(key, NullWritable.get()); } }程序入口类
package com.tingcream.hadoopStudy.flowSort; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * runner 入口类 * @author jelly */ public class FlowSortRunner extends Configured implements Tool{ @Override public int run(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(FlowSortRunner.class); job.setMapperClass(FlowSortMapper.class); job.setReducerClass(FlowSortReducer.class); //设置map程序的输出key、value //job.setMapOutputKeyClass(Text.class); //job.setMapOutputValueClass(Flow.class); //设置 输出 key、value job.setOutputKeyClass(Flow.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.setInputPaths(job, new Path(args[0]));//输入数据路径 //检查一下参数所指定的输出路径是否存在,如果已存在,先删除 Path output = new Path(args[1]); FileSystem fs = FileSystem.get(conf); if(fs.exists(output)){ fs.delete(output, true); } FileOutputFormat.setOutputPath(job, new Path(args[1]));//输出数据路径 return job.waitForCompletion(true)?0:1; } public static void main(String[] args) throws Exception { int status = ToolRunner.run(new Configuration(), new FlowSortRunner(), args); System.exit(status); } }再次提交jar包到hadoop集群中运行
13600217502 186852 200 187052 13726230503 2481 24681 27162 13925057413 63 11058 11121 18320173382 18 9531 9549 13502468823 102 7335 7437 13660577991 9 6960 6969 13922314466 3008 3720 6728 13560439658 5892 400 6292 15013685858 27 3659 3686 15920133257 20 3156 3176 13602846565 12 1938 1950 15989002119 3 1938 1941 13926251106 14 1727 1741 13926435656 1512 200 1712 13823070001 180 200 380 13480253104 180 200 380 13760778710 120 200 320 13826544101 0 200 200 13719199419 0 200 200ok , 现在的结果是按照总流量倒序排序显示的了,运行结果正确!!
AreaPartitioner.java 自定义分区规则
package com.tingcream.hadoopStudy.flowArea; import java.util.HashMap; import org.apache.hadoop.mapreduce.Partitioner; public class AreaPartitioner<KEY, VALUE> extends Partitioner<KEY, VALUE> { private static HashMap<String,Integer> areaMap = new HashMap<String,Integer>(); static{ areaMap.put("135", 0); areaMap.put("136", 1); areaMap.put("137", 2); areaMap.put("138", 3); areaMap.put("139", 4); //0 、1 、2 、3 、4 、5 总共6个区 } @Override public int getPartition(KEY key, VALUE value, int numPartitions) { // areaMap.get(key.toString().substring(0, 3)); String phonePrefix = key.toString().substring(0, 3); return areaMap.get(phonePrefix)==null?5:areaMap.get(phonePrefix); } }Flow.java
package com.tingcream.hadoopStudy.flowArea; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; /** * Flow是我们自定的bean,要想在hadoop的map、reduce中传输数据,必须实现hadoop的序列化接口, * 如 Writable 或 WritableComparable<T> 接口 * * @author jelly * */ public class Flow implements Writable { private String phone;//手机号 private long up;//上行流量 private long down;//下线流量 private long sum;//总流量 //构造函数 public Flow() { } //构造函数 public Flow(String phone, long up, long down) { super(); this.phone = phone; this.up = up; this.down = down; this.sum=this.up+this.down; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(this.phone); out.writeLong(this.up); out.writeLong(this.down); out.writeLong(this.sum); } @Override public void readFields(DataInput in) throws IOException { this.phone=in.readUTF(); this.up=in.readLong(); this.down=in.readLong(); this.sum=in.readLong(); } @Override public String toString() { return this.up+"\t"+this.down+"\t"+this.sum; } public String getPhone() { return phone; } public void setPhone(String phone) { this.phone = phone; } public long getUp() { return up; } public void setUp(long up) { this.up = up; } public long getDown() { return down; } public void setDown(long down) { this.down = down; } public long getSum() { return sum; } }map程序
package com.tingcream.hadoopStudy.flowArea; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.commons.lang.StringUtils; /** * flowsum map程序 * * * @author jelly * */ public class FlowSumAreaMapper extends Mapper<LongWritable, Text, Text, Flow>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //拿一行数据 String line = value.toString(); //切分成各个字段 String[] fields = StringUtils.split(line, "\t"); //拿到我们需要的字段 String phone = fields[1]; long up= Long.parseLong(fields[7]); long down = Long.parseLong(fields[8]); //封装数据为kv并输出 <phone:flow> context.write(new Text(phone), new Flow(phone,up,down)); } }reduce程序
package com.tingcream.hadoopStudy.flowArea; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; /** * flowsum 中的reduce程序 * @author jelly * */ public class FlowSumAreaReducer extends Reducer<Text, Flow, Text, Flow> { @Override protected void reduce(Text key, Iterable<Flow> values, Context context) throws IOException, InterruptedException { // <phone:{flow,flow,flow,flow}> // reduce中的业务逻辑就是遍历values,然后进行累加求和再输出 long up =0;// long down =0; for(Flow flow:values ){ up+= flow.getUp(); down+= flow.getDown(); } context.write(key, new Flow(key.toString(),up,down)); } }入口程序
package com.tingcream.hadoopStudy.flowArea; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * runner 入口类 * @author jelly */ public class FlowSumAreaRunner extends Configured implements Tool{ @Override public int run(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(FlowSumAreaRunner.class); job.setMapperClass(FlowSumAreaMapper.class); job.setReducerClass(FlowSumAreaReducer.class); //设置map程序的输出key、value job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Flow.class); //设置 输出 key、value job.setOutputKeyClass(Text.class); job.setOutputValueClass(Flow.class); //设置我们自定义的分组逻辑定义 job.setPartitionerClass(AreaPartitioner.class); //设置reduce的任务并发数,应该跟分组的数量保持一致 job.setNumReduceTasks(6); //6 或者1个 FileInputFormat.setInputPaths(job, new Path(args[0]));//输入数据路径 /flow/input //检查一下参数所指定的输出路径是否存在,如果已存在,先删除 Path output = new Path(args[1]); FileSystem fs = FileSystem.get(conf); if(fs.exists(output)){ fs.delete(output, true); } FileOutputFormat.setOutputPath(job, new Path(args[1]));//输出数据路径 /flow/output3 return job.waitForCompletion(true)?0:1; } public static void main(String[] args) throws Exception { int status = ToolRunner.run(new Configuration(), new FlowSumAreaRunner(), args); System.exit(status); } }hadoop jar flow.jar com.tingcream.hadoopStudy.flowArea.FlowSumAreaRunner /flow/input/ /flow/areaoutput/
[hadoop@hadoopNode01 ~]$ hadoop fs -ls /flow/areaoutput/
Found 7 items
-rw-r--r-- 2 hadoop supergroup 0 2018-06-06 03:48 /flow/areaoutput/_SUCCESS
-rw-r--r-- 2 hadoop supergroup 52 2018-06-06 03:48 /flow/areaoutput/part-r-00000
-rw-r--r-- 2 hadoop supergroup 79 2018-06-06 03:48 /flow/areaoutput/part-r-00001
-rw-r--r-- 2 hadoop supergroup 75 2018-06-06 03:48 /flow/areaoutput/part-r-00002
-rw-r--r-- 2 hadoop supergroup 46 2018-06-06 03:48 /flow/areaoutput/part-r-00003
-rw-r--r-- 2 hadoop supergroup 105 2018-06-06 03:48 /flow/areaoutput/part-r-00004
-rw-r--r-- 2 hadoop supergroup 123 2018-06-06 03:48 /flow/areaoutput/part-r-00005
查看每个输出文件发现,part-r-00000 中统计的是所有135开头的手机号的数据,part-r-00001中统计的是所有136开头的手机号的数据... 而part-r-00005中统计的是非135,136,137,138,139开头手机号的其他手机号的数据。
ok !
Copyright © 叮叮声的奶酪 版权所有
备案号:鄂ICP备17018671号-1