10.Mapreduce实例——MapReduce自定义输入格式
Mapreduce实例——MapReduce自定义输入格式
实验步骤
1.开启Hadoop
2.新建mapreduce11目录
在Linux本地新建/data/mapreduce11目录
3. 上传文件到linux中
(自行生成文本文件,放到个人指定文件夹下)
cat1文件
52001 有机蔬果 601
52002 有机肉类水产 602
52003 有机粮油干货 603
52004 有机冲饮 604
52005 其它 605
4.在HDFS中新建目录
首先在HDFS上新建/mymapreduce11/in目录,然后将Linux本地/data/mapreduce11目录下的cat1文件导入到HDFS的/mymapreduce11/in目录中。
hadoop fs -mkdir -p /mymapreduce11/in
hadoop fs -put /root/data/mapreduce11/cat1 /mymapreduce11/in
5.新建Java Project项目
新建Java Project项目,项目名为mapreduce。
在mapreduce项目下新建包,包名为mapreduce10。
在mapreduce10包下新建类,类名为FileKeyInputFormat、FileKeyRecordReader、FileKeyMR。
6.添加项目所需依赖的jar包
右键项目,新建一个文件夹,命名为:hadoop2lib,用于存放项目所需的jar包。
将/data/mapreduce2目录下,hadoop2lib目录中的jar包,拷贝到eclipse中mapreduce2项目的hadoop2lib目录下。
hadoop2lib为自己从网上下载的,并不是通过实验教程里的命令下载的
选中所有项目hadoop2lib目录下所有jar包,并添加到Build Path中。
7.编写程序代码
(1)FileKeyInputFormat.java
package mapreduce10; import java.io.IOException; import java.util.List; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; public class FileKeyInputFormat extends FileInputFormat{ public FileKeyInputFormat(){} public RecordReader createRecordReader(InputSplit split,TaskAttemptContext tac) throws IOException,InterruptedException{ FileKeyRecordReader fkrr=new FileKeyRecordReader(); try { fkrr.initialize(split,tac); } catch (Exception e) { e.printStackTrace(); } return fkrr; } protected long computeSplitSize(long blockSize,long minSize,long maxSize){ return super.computeSplitSize(blockSize,minSize,maxSize); } public List getSplits(JobContext arg0)throws IOException{ return super.getSplits(arg0); } protected boolean isSplitable(JobContext context,Path filename){ return true; } protected List listStatus(JobContext arg0)throws IOException{ return super.listStatus(arg0); } }
(2)FileKeyMR.java
package mapreduce10; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; public class FileKeyMR{ public static class Map extends Mapper
(3)FileKeyRecordReader.java
package mapreduce10; import java.io.IOException; import java.io.InterruptedIOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.input.LineRecordReader; public class FileKeyRecordReader extends RecordReader{ public FileKeyRecordReader(){} String fn; LineRecordReader lrr=new LineRecordReader(); public void initialize(InputSplit arg0,TaskAttemptContext arg1) throws IOException,InterruptedException{ lrr.initialize(arg0, arg1); this.fn=((FileSplit)arg0).getPath().getName(); } public void close()throws IOException{ lrr.close(); } public Text getCurrentKey()throws IOException,InterruptedException{ System.out.println("CurrentKey"); LongWritable lw=lrr.getCurrentKey(); Text key =new Text("("+fn+"@"+lw+")"); System.out.println("key--"+key); return key; } public Text getCurrentValue()throws IOException,InterruptedException{ return lrr.getCurrentValue(); } public float getProgress()throws IOException,InterruptedException{ return 0; } public boolean nextKeyValue() throws IOException,InterruptedIOException{ return lrr.nextKeyValue(); } }
8.运行代码
在FileKeyMR类文件中,右键并点击=>Run As=>Run on Hadoop选项,将MapReduce任务提交到Hadoop中。
9.查看实验结果
待执行完毕后,进入命令模式下,在HDFS中/mymapreduce11/out查看实验结果。
hadoop fs -ls /mymapreduce11/out
hadoop fs -cat /mymapreduce11/out/part-r-00000
图一为我的运行结果,图二为实验结果
经过对比,发现结果一样
此处为浏览器截图