大数据技术:第三部分:Hadoop核心组件


HDFS分布式文件系统

分布式文件系统

为什么需要文件系统?

存储设备需要考虑问题:

  • 数据位置
  • 数据长度
  • 空间利用率
  • 快速读取
  • 数据安全
  • 数据清理
  • 空间整理
  • ......

如何以更低的代价满足以上需求?
       操作系统提供中间层;应用开发人员来说,记住文件名和路径,而其他事情转交给中间层。

拥有文件系统带来好处(对用户的使用):

  • 操作方便
  • 管理灵活
  • 安全保障

什么是文件系统及其主要功能职责?

文件系统:基于操作系统,用来管理和组织保存在存储设备上的数据的系统软件。

主要功能职责:

  • 通过对数据存储布局、空间管理、文件命名和安全控制等方面的管理,解决了如何在设备上有效地存储数据的问题;

  • 通过文件系统,实现了数据的完整性,也就是保证了写入磁盘的数据和随后读出的数据的一致性,同时也实现了数据读写的简单化安全性

  • 文件系统除了保存和管理以文件方式存储的数据外,同样也将保存文件以及文件系统自身的一些重要元数据信息。

元数据:文件的权限、大小、修改日期、属性和存储位置等信息。

为什么需要分布式文件系统?

快速存取大数据带来的新需求

  • 单块硬盘的空间大小、存取速度都无法满足需求。

什么是分布式文件系统?

分布式文件系统:文件系统管理的物理存储资源不一定直接连接在本地节点上,而在通过计算机网络与节点相连的集群中。

HDFS总体介绍

HDFS简介

  • HDFS是Apache Hadoop Core项目的核心组成部分

  • HDFS是指被设计成适合运行在通用/廉价硬件上的分布式文件系统。

  • 源自于Google的GFS论文

    • 发表于2003年10月
    • HDFS是GFS克隆版

HDFS的体系结构是一个主从式的结构:

  • 主节点NameNode(NN):1个

  • 从节点DataNode(DN):n个

  • SecondaryNameNode协助主节

HDFS优势

  • 适合批处理

    • 移动计算(非移动数据)
  • 适合大数据处理

    • TB、PB级数据
  • 高容错性

    • 数据保存多个副本
  • 集群规模可拓展

    • 10K节点规模
  • 流式文件访问

    • 适合一次性写入多次读取
  • 可构建在普通廉价机器上

HDFS劣势

  • 低延迟数据访问

    • 毫秒级数据响应访问需求
  • 小文件存取
    寻道时间超过读取时间

  • 并发写入,文件随机修改

    • 一个文件只能有一个写者
    • 仅支持append

HDFS架构解析

HDFS的NameNode

NameNode (NN,主节点)负责:

  • 接收用户的操作请求

  • 管理目录结构与元数据信息

  • 保管文件,block块序列,DataNode节点之间的映射关系

为什么NameNode为什么NameNode需要保存目录和元数据以及各种映射关系?

NameNode是如何维护管理这些信息的?

元数据管理相关的管理文件

Fslmage (File System Image) :元数据镜像文件,保存了最新的元数据检查点。

  • 对于文件来说包括了数据块描述信息、修改时间、访问时间等;

  • 对于目录来说包括修改时间、访问权限、控制信息等。

Editslog:元数据操作日志,记录最新检查点之后针对文件系统做的修改操作记录。

Fslmage + Editslog=当前最新最完整的元数据

HDFS的SecondaryNameNode

SecondaryNameNode作为NameNode的助手

  1. 为元数据提供镜像备份;

  2. 完成镜像Fsimage和Editslog文件的定期合并,再发送给NameNode,缓解解NameNode工作压力。

HDFS的DataNode

DataNode(从节点)
存储数据:在HDFS中的文件是被切分成block块来进行存储的。

HDFS客户端

  • 客户端是用户操作HDFS最常用的方式;

  • HDFS客户端是一个库,暴露了HDFS文件系统接口,这些接口隐藏了HDFS实现中的大部分复杂性;

  • 支持打开、读取、写入等常见的操作,并且提供了类似Shell的命令行方式来访问数据;

  • 提供Java API,作为应用程序访问文件系统的客户端编程接口。

HDFS通讯协议

  • 所有的HDFS通信协议都是构建在TCP/IP协议基础之上的;

  • 客户端通过一个可配置的端口向名称节点主动发起TCP连接,并使用客户端协议与名称节点进行交互

  • 交互

  • 名称节点---数据节点:数据节点协议

  • HDFS客户端--数据节点:RPC

RPC(Remote Procedure Call)


       允许程序调用另一个地址空间(共享网络的另一台机器)的过程或函数,程序员无需显式编码这个远程调用的细节。本地接口/服务和远程接口/服务本质都是相同。

举例:两台服务器A,B,一个应用部署在A服务器上,想要调用B服务器上应用提供的函数或者方法,由于不在一个内存空间,不能直接调用,这时候就可以应用RPC框架的实现来解决。

Hadoop 1.0版本架构的缺点

  • NameNode的单点故障问题;

  • NameNode的拓展性问题:NameNode含有用户存储文件的全部元数据信息,单NameNode节点内存容量有限;

  • 性能问题:当HDFS中存储大量的小文件时,NameNode需要存储大量的元数据信息,造成内存压力增加;

  • 隔离性问题:单个NameNode不能提供隔离性,某个用户提交的负载很大的job会减慢其他用户的job

Hadoop 2.0组件


HDFS 联邦( federation ):允许系统通过添NameNode实现扩展。

HDFS高可用(high availability):增加备用,防止NameNode单点失效。

Hadoop 2.0组件-HDFS Federation

HDFS Federation设计可解决单名称节点存在的以下几个问题:

  1. HDFS集群扩展性:多个名称节点各自分管一部分目录,使得一个集群可以扩展到更多节点,不再像HDFS1.0中那样由于内存的限制制约文件存储数目;

  2. 性能更高效:多个名称节点管理不同的数据,且同时对外提供服务,将为用户提供更高的读写吞吐率;

  3. 良好的隔离性:用户可根据需要将不同业务数据交由不同名称节点管理,这样不同业务之间影响很小。

需要注意的,HDFS Federation并不能解决单点故障问题,也就是说,每个名称节点都存在在单点故障问题,需要为每个名称节点部署一个后备名称节点,以应对名称节点挂掉对业务产生的影响。

Hadoop 2.0组件-High Availability

ActiveNamenode

Standy NameNode

数据读写过程

HDFS数据读写--写流程

HDFS数据读写--读流程


HDFS容错机制

HDFS容错机制——三类故障

  1. 节点失效
  2. 网络故障
  3. 数据损坏

HDFS容错机制——故障检测机制

  1. 节点失效检测机制

  2. 通信故障检测机制

  3. 错误检测机制

HDFS容错机制——回复信息

HDFS容错机制——读写的容错

  1. 写容错

  2. 读容器

HDFS容错机制——数据节点失效


HDFS容错机制——备份规则

  1. 机架与数据节点

  2. 副本放置策略

HDFS编程实现

基本shell命令

Hadoop Shell命令

Java API

MapReduce编程与应用

MapReduce总体概述

MapReduce出现背景

摩尔定律:CPU性能大约每隔18个月翻一番。

从2005年开始摩尔定律逐渐失效,需要处理的数据量却快速增加,矛盾越来越凸显。

人们开始借助于分布式并行编程来提高程序性能 ,分布式程序运行在大规模计算机集群上,可以并行执行大规模数据处理任务,从而获得海量的计算能力。

MapReduce的起源

与传统分布式计算框架的比较

传统并行计算框架 MapReduce
集群架构/容错性 共享是(共享内存/共享存储),容错性差 非共享式,容错性好
硬件/价格/拓展性 刀片服务器、高速网、SAN,价格贵,扩展性差 普通PC机,便宜,扩展性好
编程/使用难度 what-how,难 what,简单
适用场景 实时、细粒度计算、计算密集型 批处理、非实时、数据密集型

MapReduce三层含义

  • MapReduce是一个并行计算与运行软件框架

    • 提供了一个庞大但设计精良的并行计算软件框架
    • 能自动完成计算任务的并行化处理
    • 能自动划分计算数据和计算任务
    • 在集群节点上自动分配和执行任务以及收集计算结果
    • 将数据分布存储、数据通信、容错处理等并行计算涉及到的很多系统底层的复杂细节交由系统负责处理
  • MapReduce是一个基于集群的高性能并行计算平台

    • 允许用市场上廉价的计算机构成一个包含数十、数百至数千个节点的分布式并行计算集群
  • MapReduce是一个并行程序设计模型与方法

    • 用Map和Reduce两个函数编程实现基本的并行计算任务,提供了抽象的操作和并行编程接口,以简单方便地完成大规模数据的编程和计算处理。

Map 与 Reduce函数

函数 输入 输出 说明
Map $ \left\langle k_{1}, v_{1}\right\rangle $ $ \operatorname{List}\left(\right) $
Reduce $ $ $ $

MapReduce处理流程

MapReduce处理流程动画








MapReduce编程实例

WordCount任务

  • 程序:WordCount

  • 输入:一个包含大量单词的文本文件。

  • 输出:文件中每个单词及其出现次数(频数),并按照单词字母顺序排序,每个单词和其频数各占一行,二者之间有间隔。

WordCount程序执行过程

MapReduce代码编写

Map

public static class MyMapper extends Mapper {
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(Object key, Text value, Context context) throws
            IOException, InterruptedException {
        StringTokenizer itr = new StringTokenizer(value.toString());
        while (itr.hasMoreTokens()) {
            word.set(itr.nextToken());
            context.write(word, one);
        }
    }
}

Reduce

public static class MyReducer extends Reducer {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable values, Context
            context) throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable val : values) {
            sum += val.get();
        }
        result.set(sum);
        context.write(key, result);
    }
}

Main

public static void main(String[]args)throws Exception{
        Configuration conf=new Configuration(); //程序运行时参数
        String[]otherArgs=new GenericOptionsParser(conf,args).getRemainingArgs();
        if(otherArgs.length!=2)
        {System.err.println("Usage: wordcount  ");
        System.exit(2);
        }
        Job job=new Job(conf,"word count"); //设置环境参数
        job.setJarByClass(WordCount.class); //设置整个程序的类名
        job.setMapperClass(MyMapper.class); //添加MyMapper类
        job.setReducerClass(MyReducer.class); //添加MyReducer类
        job.setOutputKeyClass(Text.class); //设置输出类型
        job.setOutputValueClass(IntWritable.class); //设置输出类型
        FileInputFormat.addInputPath(job,new Path(otherArgs[0])); //设置输入文件
        FileOutputFormat.setOutputPath(job,new Path(otherArgs[1])); //设置输出文件
        System.exit(job.waitForCompletion(true)?0:1);
}

完整代码

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import javax.naming.Context;
import javax.security.auth.login.Configuration;
import javax.xml.soap.Text;

public class WordCount {
    public static class MyMapper extends Mapper {
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                context.write(word, one);
            }
        }
    }

    public static class MyReducer extends Reducer {
        private IntWritable result = new IntWritable();

        public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length != 2) {
            System.err.println("Usage: wordcount  ");
            System.exit(2);
        }
        Job job = new Job(conf, "word count");
        job.setJarByClass(WordCount.class);
        job.setMapperClass(MyMapper.class);
        job.setReducerClass(MyReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
}

编译打包代码以及运行程序

实验步骤:

  1. 使用java编译程序,生成.class文件

  2. 将.class文件打包为jar包

  3. 运行jar包(需要启动Hadoop)

  4. 查看结果

MapReduce应用解析

MapReduce模型的应用

  • 关系代数运算(选择、投影、并、交、差、连接)

  • 分组与聚合运算

  • 矩阵-向量乘法

  • 矩阵乘法

  • ...

MapReduce模型的应用-表连接