9.Mapreduce实例——ChainMapReduce
Mapreduce实例——倒排索引
实验步骤
1.开启Hadoop
2.新建mapreduce10目录
在Linux本地新建/data/mapreduce10目录
3. 上传文件到linux中
(自行生成文本文件,放到个人指定文件夹下)
goods_0
袜子 189
毛衣 600
裤子 780
鞋子 30
呢子外套 90
牛仔外套 130
羽绒服 7
帽子 21
帽子 6
羽绒服 12
4.在HDFS中新建目录
首先在HDFS上新建/mymapreduce10/in目录,然后将Linux本地/data/mapreduce10目录下的goods_0文件导入到HDFS的/mymapreduce10/in目录中。
hadoop fs -mkdir -p /mymapreduce10/in
hadoop fs -put /root/data/mapreduce10/goods_0 /mymapreduce10/in
5.新建Java Project项目
新建Java Project项目,项目名为mapreduce。
在mapreduce项目下新建包,包名为mapreduce9。
在mapreduce9包下新建类,类名为ChainMapReduce。
6.添加项目所需依赖的jar包
右键项目,新建一个文件夹,命名为:hadoop2lib,用于存放项目所需的jar包。
将/data/mapreduce2目录下,hadoop2lib目录中的jar包,拷贝到eclipse中mapreduce2项目的hadoop2lib目录下。
hadoop2lib为自己从网上下载的,并不是通过实验教程里的命令下载的
选中所有项目hadoop2lib目录下所有jar包,并添加到Build Path中。
7.编写程序代码
ChainMapReduce.java
package mapreduce9; import java.io.IOException; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.chain.ChainMapper; import org.apache.hadoop.mapreduce.lib.chain.ChainReducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.io.DoubleWritable; public class ChainMapReduce { private static final String INPUTPATH = "hdfs://192.168.109.10:9000/mymapreduce10/in/goods_0"; private static final String OUTPUTPATH = "hdfs://192.168.109.10:9000/mymapreduce10/out"; public static void main(String[] args) { try { Configuration conf = new Configuration(); FileSystem fileSystem = FileSystem.get(new URI(OUTPUTPATH), conf); if (fileSystem.exists(new Path(OUTPUTPATH))) { fileSystem.delete(new Path(OUTPUTPATH), true); } Job job = new Job(conf, ChainMapReduce.class.getSimpleName()); FileInputFormat.addInputPath(job, new Path(INPUTPATH)); job.setInputFormatClass(TextInputFormat.class); ChainMapper.addMapper(job, FilterMapper1.class, LongWritable.class, Text.class, Text.class, DoubleWritable.class, conf); ChainMapper.addMapper(job, FilterMapper2.class, Text.class, DoubleWritable.class, Text.class, DoubleWritable.class, conf); ChainReducer.setReducer(job, SumReducer.class, Text.class, DoubleWritable.class, Text.class, DoubleWritable.class, conf); ChainReducer.addMapper(job, FilterMapper3.class, Text.class, DoubleWritable.class, Text.class, DoubleWritable.class, conf); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(DoubleWritable.class); job.setPartitionerClass(HashPartitioner.class); job.setNumReduceTasks(1); job.setOutputKeyClass(Text.class); job.setOutputValueClass(DoubleWritable.class); FileOutputFormat.setOutputPath(job, new Path(OUTPUTPATH)); job.setOutputFormatClass(TextOutputFormat.class); System.exit(job.waitForCompletion(true) ? 0 : 1); } catch (Exception e) { e.printStackTrace(); } } public static class FilterMapper1 extends Mapper{ private Text outKey = new Text(); private DoubleWritable outValue = new DoubleWritable(); @Override protected void map(LongWritable key, Text value, Mapper .Context context) throws IOException,InterruptedException { String line = value.toString(); if (line.length() > 0) { String[] splits = line.split("\t"); double visit = Double.parseDouble(splits[1].trim()); if (visit <= 600) { outKey.set(splits[0]); outValue.set(visit); context.write(outKey, outValue); } } } } public static class FilterMapper2 extends Mapper { @Override protected void map(Text key, DoubleWritable value, Mapper .Context context) throws IOException,InterruptedException { if (value.get() < 100) { context.write(key, value); } } } public static class SumReducer extends Reducer { private DoubleWritable outValue = new DoubleWritable(); @Override protected void reduce(Text key, Iterable values, Reducer .Context context) throws IOException, InterruptedException { double sum = 0; for (DoubleWritable val : values) { sum += val.get(); } outValue.set(sum); context.write(key, outValue); } } public static class FilterMapper3 extends Mapper { @Override protected void map(Text key, DoubleWritable value, Mapper .Context context) throws IOException, InterruptedException { if (key.toString().length() < 3) { System.out.println("写出去的内容为:" + key.toString() +"++++"+ value.toString()); context.write(key, value); } } } }
8.运行代码
在ChainMapReduce类文件中,右键并点击=>Run As=>Run on Hadoop选项,将MapReduce任务提交到Hadoop中。
9.查看实验结果
待执行完毕后,进入命令模式下,在HDFS中/mymapreduce10/out查看实验结果。
hadoop fs -ls /mymapreduce10/out
hadoop fs -cat /mymapreduce10/out/part-r-00000
图一为我的运行结果,图二为实验结果
经过对比,发现结果一样
此处为浏览器截图