03_MapReduce框架原理_3.10 OutputFormat 数据输出类
1. 说明
2. 常用实现类
3. 使用 自定义 OutputFormat类
步骤1 自定义 FileOutputFormat类 继承 FileOutputFormat
重写 getRecordWriter方法
步骤2 自定义 RecordWriter类 继承 RecordWriter
1. 创建 FileSystem对象
2. 获取 输出流对象
3. 写出数据
4. 关闭流
步骤3 驱动类中 指定OutputFormat 实现类
1. job.setOutputFormatClass(classOf[CustOutputFormat])
2. 指定 _SUCCESS 存放目录
FileOutputFormat.setOutputPath(job, new Path("/output1"))
4. 案例
package OutputFormatPk { import java.lang import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FSDataOutputStream, FileSystem, Path} import org.apache.hadoop.io.{IOUtils, IntWritable, LongWritable, Text} import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat import org.apache.hadoop.mapreduce._ import scala.util.matching.Regex // Mapper 类 class WCComMapper extends Mapper[LongWritable, Text, Text, IntWritable] { var text = new Text var intWritable = new IntWritable(1) // 每行记录调用一次map方法 override def map(key: LongWritable, value: Text, context: Mapper[LongWritable, Text, Text, IntWritable]#Context) = { println("map enter .....") //1. 获取一行记录 val line = value.toString //2. 切割 val words = line.split(" ") //3. 输出到缓冲区 words.foreach( key1 => { text.set(key1); context.write(text, intWritable) } ) } } // Reducer 类 class WCComReducer extends Reducer[Text, IntWritable, Text, IntWritable] { private val intWritable = new IntWritable // 每个key调用一次 override def reduce(key: Text, values: lang.Iterable[IntWritable], context: Reducer[Text, IntWritable, Text, IntWritable]#Context) = { var sum: Int = 0 // 1. 对词频数 求sum values.forEach(sum += _.get) // 2. 输出结果 intWritable.set(sum) context.write(key, intWritable) } } // Driver object Driver { def main(args: Array[String]): Unit = { //1. 获取配置信息以及 获取job对象 //读取配置文件 Configuration: core-default.xml, core-site.xml var configuration = new Configuration var job: Job = Job.getInstance(configuration) //2. 注册本Driver程序的jar job.setJarByClass(this.getClass) job.setJobName("scala mr") //3. 注册 Mapper 和 Reducer的jar job.setMapperClass(classOf[WCComMapper]) job.setReducerClass(classOf[WCComReducer]) //4. 设置Mapper 类输出key-value 数据类型 job.setMapOutputKeyClass(classOf[Text]) job.setMapOutputValueClass(classOf[IntWritable]) //5. 设置最终输出key-value 数据类型 job.setOutputKeyClass(classOf[Text]) job.setOutputValueClass(classOf[IntWritable]) //6. 设置输入输出路径 FileInputFormat.setInputPaths(job, "src/main/data/input/1.txt") // 指定成功标识文件 _SUCCESS 存储目录 //FileOutputFormat.setOutputPath(job, new Path("src/main/data/output1")) //7. 指定自定义的OutputFormat类 job.setOutputFormatClass(classOf[CustOutputFormat]) //8. 提交job val bool: Boolean = job.waitForCompletion(true) System.exit(bool match { case true => "0".toInt case false => "1".toInt }) } } // 自定义 输出器 // 泛型为 reduce输出的 key,value 类型 class CustOutputFormat extends FileOutputFormat[Text, IntWritable] { // 获取 RecordWriter 对象 override def getRecordWriter(job: TaskAttemptContext): RecordWriter[Text, IntWritable] = { new CustRecordWriter(job) } } // 自定义 RecordWriter类 // 泛型为 reduce输出的 key,value 类型 // 需求 // 将 汉字存储到 ch.txt 将数字存储到 num.txt class CustRecordWriter(val job: TaskAttemptContext) extends RecordWriter[Text, IntWritable] { // 获取 FileSystem 对象(通过配置文件) val fs: FileSystem = FileSystem.get(job.getConfiguration) // 通过 FileSystem对象创建 输出流对象 private val choutputStream: FSDataOutputStream = fs.create(new Path("src/main/data/output/ch.txt")) private val numoutputStream: FSDataOutputStream = fs.create(new Path("src/main/data/output/num.txt")) // 写出操作 // 输入为 reduce的输出 override def write(key: Text, value: IntWritable): Unit = { val regex ="""^\d+$""".r if (regex.findFirstMatchIn(key.toString) != None) { numoutputStream.writeBytes(key.toString + "\n") } else { choutputStream.writeUTF(key.toString) } } // 关流操作 override def close(context: TaskAttemptContext): Unit = { IOUtils.closeStream(choutputStream) IOUtils.closeStream(numoutputStream) } } }