为什么会有大数据
什么是大数据
- 随着近年来计算机技术的不断革新,数字传感器的进步使得通信系统的使用越来越广泛,其中移动平台和移动终端飞速增长,以系统运行产生的大量日志以及企业开展无纸化办公,企业积攒了海量数据资源。
- 摩尔定律告诉我们,大约每两年计算机的性能将历史性提升一倍,可是如今数据处理需求的增长速度已经快于计算机资源处理能力的提升速度。为此人们提出了一种解决办法:数据的并行处理,通过利用多台计算机并行处理这些海量数据。其中Hadoop就是利用互联网的多台计算机使用MapReduce来并行的处理和计算大量的数据。
- 目前大数据可以被定义为无法被符合服务等级协议的单台计算机处理或者存储的任何数据集。理论上单台计算机可以处理任意规模的数据,对于超过单台计算机存储量的海量数据,我们可以存放到类似网络附属存储(NAS)这样的共享存储设备中,然后通过计算机去处理,可是这样处理数据所花费的时间往往会大大的超过允许的数据处理时间。
大数据首要解决问题
目前大数据系统在处理方法上有一些共同特点:
- 数据分布在多个节点(网络I/O速度<<本地磁盘I/O速度)
- 计算程序离数据更近(集群上的点)
- 数据的处理尽量都在本地完成(网络I/O速度<<本地磁盘I/O速度)
- 使用可顺序读取磁盘I/O代替随机读取磁盘I/O(数据交换速度<<数据寻道时间)
所有大数据计算处理模式都有一个目的,就是使输入和输出并行化,从而提高数据处理性能。
大数据分发到多个节点有两个好处:
- 每个数据块会在多个节点上有多份拷贝,这样使得系统具有容错性,当一个节点发生故障,其他节点还备份有故障节点上的数据。
- 为了达到数据并行处理的目的,多个节点同时参与数据处理的过程。至于为什么不存放到网络文件系统中,每个节点去读取它要处理的部分,是因为本地存储磁盘的数据读取速度要远远高于通过网络来读取数据的速度。
大数据无法处理网络过载的问题,如果传输动辄上T的数据,会使网络带宽耗尽,网络拥挤,甚至导致系统故障,为了更好的处理,我们要把数据分布到各个节点上,而且除了把程序要移动到存放数据的节点,程序运行所依赖的函数库也需要移动到节点上,这样大数据可以让我们集中式的部署程序代码,大数据系统后台会在计算任务启动之前把这些程序移动到各个数据处理节点上。
虽然典型的大数据处理系统都希望把数据处理过程放在拥有数据节点的本地完成,但并不是每次都能实现,大数据系统会把计算任务尽量调度到离数据点最近的节点。
大数据运行实例
假设我们要计算2000年美国各州的总销售量,并按州排序,销售数据已经随机分发到各个计算节点,利用大数据技术主要分为如下步骤:
- 每个计算节点读取分发给自己的全部数据,然后过滤掉不是2000年的销售数据,计算节点从本地磁盘顺序读取被随机分发到该节点上的数据,在内存中过滤掉不用的数据,而不是在磁盘上过滤,这样可以避免磁盘的寻道时间。
- 各个计算节点在处理数据的时候,每发现一个新的州,就为它新建一个分组,并把销售数据加到对应的已存在的州分组中。(程序会在各个计算节点运行,分别做本地数据处理)
- 当所有的节点都完成了本地所有数据的磁盘读取工作,按照州编号分别计算其销售总额,它们会分别把各自的计算结果发送到一个特定的计算节点,这个汇聚节点是各个计算节点在计算任务伊始由所有节点协商出来的。
- 这个指定的汇聚节点会从所有计算节点按照编号汇聚全部结果,然后把各个州的来自不同计算节点的数据分别相加。
- 汇聚节点按照州把最终结果排序,并输出排序结果。
大数据的编程模式
- 大规模并行处理(MPP)数据库系统: EMC公司的Greenplum系统和IBM的Netezza系统
- 内存数据库系统: Oracle公司的Exalytics和SAP公司的HANA
- MapReduce系统: 使用广泛的Hadoop
- 整体同步并行系统(BSP): Apache的HAMA
大规模并行处理数据库系统
核心思想是把数据按照某一列或者某一组列的值,按照某种形式划分,以分别处理。但是这系统的缺陷在于需要在算法设计的时候就决定数据如何划分,而划分的准则通常由底层的用例来决定,如此一来,就不适合临时的数据查询需求。
- 数据按州划分,分配到不同的计算节点
- 各个计算节点都拥有程序所需的执行库,并对分配到该节点的数据进行数据处理
- 每个计算节点读取本地数据,一个例外是你未考虑数据的分布情况就进行了数据查询,这时,计算任务会通过网络从其他节点来获取数据。
- 每个任务都是顺序读取数据,所需要的所有数据都存放在磁盘的相邻位置,并且被一次性读取,并在内存中应用过滤条件(year==2000)
内存数据库系统
内存数据库系统类似于MPP系统,他们的不同之处在于内存数据库系统的每个节点都拥有巨大的内存,并且大部分数据会被预先加载到内存中。系统的缺陷是采用了大量的硬件和软件,费用高昂。
- 数据按州划分,各个节点把数据加载到内存中。
- 各个计算节点都拥有程序所需的执行库,并对分配到该节点的数据进行数据处理
- 每个计算节点读取本地数据,一个例外是你未考虑数据的分配情况就进行数据查询请求,这时候,计算任务会从其他节点来获取所需数据。
- 由于数据是被缓存到内存中的,所以除了最初的数据加载到内存的过程外,这里不适用顺序读取数据的特性。
MapReduce系统
Hadoop系统对MapReduce框架的实现具有如下几个重要的特征:
- 使用商用级别的硬件,这个商用硬件的要求不是指笔记本或者台式机,我们可以使用常用的硬件设备来搭建。
- 无需事先定义数据划分准则来把数据分配到各个计算节点
- 用户仅需要定义两个独立的处理过程:Map和Reduce
下面我们介绍下MapReduce大数据系统定义:
- 数据以较大的数据块的形式存放在HDFS上,HDFS是一个分布式文件系统,数据块分散存储到各个节点,数据块是冗余的。
- 程序运行依赖库,包括Map和Reduce代码被复制发送到所有任务节点
- 每个计算节点仅读取节点本地数据,集群中的所有节点运行Mapper,从节点本地读取数据到Mapper中(大多数情况下,哪个节点的Mapper读取哪个节点磁盘的数据库,这是由调度程序管理阶段定,调度程序可能会分配某个节点的Mapper任务来处理远程节点的数据块,以保持集群中的所有节点负载均衡)
- 数据被每个节点的任务以数据块的方式一次性顺序读取
MapReduce编程泛型的一个重要不足是它不适合迭代算法。大量的数据科学计算算法很自然需要使用到迭代,并最终收敛到一个解。当使用这样的算法时候,MapReduce任务每次都要从持久性存储中重新读取数据,所以每次迭代产生的结果需要存到持久性存储中供下次迭代计算使用,这个过程导致了不必要的I/O操作,对系统吞吐量造成重大的影响。
整体同步并行系统
整体同步并行系统和MapReduce过程十分相似,与MapReduce程序不同之处在于,BSP系统程序执行由一系列的超步(这个和Map处理的过程类似)组成,这些超步保持栅栏同步,向主节点发送数据并进行相关的信息交换。每当一次迭代执行完毕,主节点会通知每个数据处理节点进行下一次迭代。
大数据和事物性系统
Hadoop系统使用HBase来作为自己的NoSQL数据存储,大多数的RDBMS使用者都要求数据库必须遵守ACID准则,但是遵守这些准则是需要系统代价的。当数据库后台需要处理峰值为每秒数百万次的事务操作时候,要求苛刻的遵守ACID准则对数据库来说是个巨大的挑战。
对苛刻的ACID准则做出妥协是必要的,做出妥协的主要理论依据就是CAP理论:
- 一致性:在分布式系统的所有数据备份,在同一时刻有同样的值
- 可用性:在合理且明确的时间内,保证每个请求都能获得成功或者失败的结果的响应。
- 分区容忍性:在集群中的一部分节点故障后,集群整体仍可以使用。
这个理论可以证明:任何分布式系统只能同时满足其中的两个特性,而无法三者兼顾。
Hadoop简介
Hadoop是谷歌以2004年发表的一篇关于MapReduce的论文作为基础开发的,就自身来讲,Hadoop是一个基于Java语言的MapReduce框架。随着Hadoop被越来越多的企业采用,自身不断改进并衍生出很多子项目。
- Hadoop Streaming——任何命令行脚本都可以通过Streaming调用MapReduce框架。
- Hadoop Hive——使用MapReduce平台的用户发现,开发一个MapReduce程序常常需要大量的编码工作,而且易于出错且难以测试。Apache Hive可以把海量的数据集放入数据仓库,用户可以编写类似SQL的语句的Hive查询语句来查找数据。Hive引擎把Hive查询语句透明地转换为底层MapReduce任务来执行。高级用户还可以用Java编写自定义函数。
- Hadoop Pig——使用Pig的目的与使用Hive的目的是一样的,但是Hive是一个类似SQL的语言,属于陈述性语言,而Pig是一种过程性语言,非常适合数据管道应用场景。
- Hadoop Hbase——所有之前的项目,包括MapReduce都是批处理程序,并不是实时的查询数据,而Hbase可以在Hadoop中实时查询数据。
MapReduce编程模型简介
MapReduce模型有两个彼此独立的步骤,这两个步骤可以配置并需要用户在程序中自定义:
- Map: 数据初始读取和转换步骤,这个步骤中,每个独立的输入数据记录都进行并行处理
- Reduce: 一个数据整合或者加和的步骤,在这个步骤中,相关联的所有数据记录要放在一个计算节点来处理。
Hadoop系统中MapReduce的核心思路是:将输入的数据在逻辑上分割成多个数据块,每个逻辑数据块被Map任务单独地处理。数据块处理后所得结果会被划分到不同的数据集,且将数据集排序完成。每个经过排序的数据集传输到Reduce任务进行处理。
当数据量不是很大的时候,我们计算文档中每个词出现的个数并不是一件难事。
- 维护一个哈希表,该哈希表的键为文本中的每一个词,该键对应的值是该词的个数
- 把每篇文档加载进内存
- 把文档分隔成一个个的词
- 对于文档中的每个词,更新其在哈希表中的个数
- 当所有的文档都处理完成,我们就得到了所有单词的计数
当数据量非常大的时候,我们可以尝试使用MapReduce来解决这个计数问题
- 假设有一个多台服务器组成的集群供我们使用,假设该集群的计算节点数量为50,每台服务其上都会运行大量的Map处理,假设有一千万个文件,这样就会有一千万个Map处理这些文件,在给定的时间内,我们假设有多少个CPU核,就会有多少个Map在同时进行。集群的服务器是8核的,所以可以有8个Map同事运行,这样每台服务器负责运行20万个Map处理,整个数据处理过程中,每个计算节点都会运行8个Mapper,共25000个迭代(每次迭代过程可以运行8个Mapper,一个CPU运行一个Mapper)
- 每个Mapper处理一个文件,抽取文件的单词,输出<{WORD},1>键值对
- 假设我们只有一个Reduce,这个假设不是必须的,只是默认的设定,实际应用的场景中我们根据需求常常需要改变。
- Reduce接收<{WORD},[1,…,1]>这样的键值对
- Reduce每处理一个相同的单词,就将该单词的计数加1,最终得到单词的总数,然后按照以下键值对格式输出:<{WORD},{单词总数}>
- 最后排序输出结果
Hadoop系统组成:
Hadoop1.x版本系统的组件:
1.名称节点(NameNode):维护着存储在HDFS上的所有文件的元数据信息。这些元数据信息包括组成文件的数据块信息,及这些数据块在数据节点上的位置。
2.辅助名称节点(SecondaryNameNode):这不是名称节点的备份,实际上是Hadoop平台的一个组件,为名称节点组件执行一些内务处理功能。
3.数据节点(DataNode):把真正的数据块存放在本地硬盘上,这些数据块组成了保存在HDFS上的每个文件。
4.作业跟踪器(JobTracker):负责一个任务的整个执行过程,它的具体功能包括:调度各个子任务(Mapper和Redeuce任务各自的子任务)到各自的计算节点运行,时刻监控任务运行和计算节点的健康状况,对失败的子任务重新调度执行。
5.任务跟踪器(TaskTracker):运行在各个数据节点上,用来启动和管理各个Map/Reduce任务,与作业跟踪器通信。
Hadoop分布式文件系统HDFS
Hadoop的文件本质是块存储
对于大数据,我们首要解决的就是大数据的存储问题,这里我们采用HDFS分布式存储的解决方案。HDFS是主从架构,运行名称节点进程的服务器为主节点,运行数据节点进行的服务器为从节点。在Hadoop系统中,每个文件都被分隔成多个数据块,每个数据块的大小为64MB,也可以配置成32MB或者128MB,这些数据块存储在数据节点上,为了防止节点故障,这些数据块是有备份的,系统默认的备份数量是3,具有机架感知功能的Hadoop系统把文件的一个数据块存储在本地机架上的一台计算节点上,第二个备份会存放在另外一个远程机架上的计算节点上,第三个备份会存放在第二次数据块备份机架上的另一台计算节点上。Hadoop系统借助一个单独配置的网络拓扑文件实现机架感知功能,这个网络拓扑文件配置了机架到计算节点的域名系统DNS名称之间的映射,该网络拓扑文件路径配置在Hadoop配置文件中。
文件元数据和名称节点
当客户端想HDFS请求读取或者存储一个文件的时候,它需要知道要访问的数据节点是哪一个,NameNode负责管理所有的文件操作,包括文件/目录的打开、关闭、重命名、移动等等。数据节点就负责存储实际的文件数据,这是一个非常重要的区别,当客户点请求或者发送文件数据,文件的数据在物理上不是经过NameNode传输的,客户端仅仅是简单地从NameNode获取文件的元数据,然后根据其元数据信息直接从数据节点获取文件的数据块。
需要注意的是NameNode并不存储数据节点的身份信息,数据节点的身份信息在集群启动的时候从每个数据节点获取。名称节点维护的信息是:HDFS的文件由哪些数据块(数据节点上每个数据块的文件名组成)
元数据存储在名称节点的本地磁盘上,但是为了快速访问,在集群操作的时候会把这些元信息加载到内存。这个提高了Hadoop系统的操作性能,但是如果Hadoop存储小文件,很容易会使元数据的数据量大幅增长,导致名称节点更大量的内存占用。但是同时成为了Hadoop系统的一个瓶颈,由此衍生出Hadoop2.x
HDFS系统写文件机制
客户端把一个文件写入到HDFS文件那系统需要经过以下几个步骤
- 客户端在联系名称节点之前,会把文件数据流式地读入到客户端本地文件系统中的一个临时文件中
- 当文件数据的大小达到一个数据块的大小时,客户端就联系名称节点
- 名称节点会在HDFS文件系统的层级结构中构建一个文件,然后把数据块的标识符和数据节点上的位置信息发送给客户端。这个数据节点数据块信息列表里面还包括了其备份节点的数据块信息列表。
- 进行完上诉的步骤,客户端就会根据上一步的数据块信息把本地临时文件中的数据刷新到集群上的数据块(只写入到第一个数据节点中)这样,真实的文件数据就放在了集群数据节点本地文件存储系统中。
- 当文件(客户端可以访问的HDFS文件)被关闭时候,名称节点会执行一个提交操作,从而使得该文件在集群中为可见状态。如果在提交操作完成之前名称节点挂掉了,这个文件就丢失了。
HDFS系统读文件机制
1)客户端访问名称节点,名称节点返回组成文件的数据块列表以及数据块的位置(包括备份数据块的位置)
2)客户端会直接访问数据节点以获取数据块中的数据。如果此时其访问数据节点出现古战,就会访问存在备份数据块的数据节点
3)读取数据块的时候会计算该数据块的校验和,并将该校验和与写入文件时候的校验和作比较,如果检验失败,则从其他数据节点获取备份数据块。
HDFS系统删除文件机制
1)名称节点仅仅重命名了文件路径,使其移动到/trash目录,需要注意的是,这个操作过程是链接到重命名文件路径的元数据的更新操作。这个执行过程非常迅速,/trash目录中的文件会保存一段时间,这个保存时间是预先确定的(当前设定为6小时而且当前不可配置)在这段时间内,把删除的文件从/trash目录中移动出来即可迅速地恢复该文件。
2)当/trash目录中的文件超过了保存时间,名称节点就会将该文件从HDFS命名空间删除。
3)删除文件就会使得改文件相关的数据块被释放,HDFS系统最后会显示增加了一些空闲的空间。‘
辅助名称节点
辅助名称节点的作用就是周期性地把edit文件中的内容与fsimage文件中的内容合并,辅助名称节点会周期性地顺序执行下列步骤:
1)辅助名称节点会请求名称节点来结转文件,确保新的更新保存到一个新的文件,这个新的文件名字叫做edits.new
2)辅助名称节点向名称节点请求获取fsimage文件和edits文件
3)辅助名称节点把edits文件和fsimage文件合并,生成一个新的fsimage文件
4)名称节点从辅助名称节点接收到新生成的fsimage文件,并替代旧的fsimage文件。同时将edits文件中的内容替换成步骤1中创建的edit.new文件的内容。
5)更新fstime文件来记录发生的检查点操作
任务跟踪器和作业跟踪器
任务跟踪器守护进程在集群中每台计算节点运行,接收诸如Map和Reduce和Shuffle这些操作任务的请求,每个任务跟踪器都会分配一定的槽位数,其槽位数的数量一般与计算节点上可用的CPU核数一致,任务跟踪器接收到一个来自作业跟踪器的请求后,就会启动一个任务,任务跟踪器会为这个任务初始化一个新的JVM。
作业跟踪器守护进程负责启动和监控MapReduce作业,当一个客户端向Hadoop系统提交一个作业,作业的启动流程如图:
1)作业跟踪器收到了作业请求
2)大多数的MapReduce作业都需要一个或多个输入文件目录,任务跟踪器向名称节点发出请求,获得一个数据节点的列表。这个列表的数据节点存储了组成输入文件数据的数据块。
3)作业跟踪器为作业的执行做准备工作。在这个步骤中,任务跟踪器确定执行该作业需要的任务(Mapper和Reducer任务)数量。作业跟踪器尽量把这些任务都调度到离数据块最近的位置进行。
4)作业跟踪器把任务提交到每个任务跟踪器节点去执行。任务跟踪器节点监控任务执行情况。任务跟踪器以预先设定的时间间隔发送心跳信息到作业跟踪器,如果作业跟踪器在预先设定的时间间隔之后,没有收到任务跟踪器发来的心跳信息,那么就认为该任务跟踪节点出现故障,任务就会被调度到另外一个节点去运行。
5)一旦所有任务都执行完毕,作业跟踪器就会更新作业状态为成功,如果任务反复失败达到一定数量,作业跟踪器就会宣布作业运行失败、
6)客户端会轮询作业跟踪器及时地获得作业运行状态、
Hadoop2.0
MapReduce已经进行了全新升级,升级后的版本被称为MapReduce2.0或者YARN,YARN是一套应用编程接口,兼容MRV1,Hadoop1.x中的作业调度器承担两个主要功能:
- 资源管理
- 作业调度/作业监控
YARN把这两个功能分为两个守护进程来分别承担,这样的设计使得系统有一个全局的资源管理器以及每个程序有一个应用程序管理器。注意这里我们提到了程序而不是作业,在新的系统中,一个程序既可以指传统概念上的一个单独的MapReduce作业,也可以指一系列作业组成的有向无环图(DAG)
YARN系统由一下几个组成部分:
- 全局资源管理器
- 针对每种应程序的应用程序管理器
- 调度器
- 容器
容器:是YARN框架中的计算单元。一部分CPU内核和一部分内存构成了容器,一个应用程序运行在一组容器当中。它是一个任务进行工作得单元子系统,也可以认为YARN框架中的容器相当于MapReduce v1中的一个任务执行器。集群节点与容器之间的关系是:一个节点可以运行多个容器,但是一个容器只能运行在一个节点之内。应用程序管理器的一个实例会向全局资源管理请求获取资源。调度器会通过每个节点的节点管理器来分配资源(容器)。节点管理器会向全局资源管理器汇报每个容器的使用情况。
节点管理器:运行在集群中的一个节点上,集群中的每一个节点都会运行一个自己的节点管理器,它是一个从属服务,它接收来自资源管理器的请求,然后分配容器给应用程序。它还负责监控和汇报资源使用情况给资源管理器。节点管理器的任务如下:
- 接收来自资源管理器的请求,为作业分配容器
- 与资源管理器交换信息,确保整个集群的稳定运行。资源管理器依靠各个节点管理器的汇报来跟踪整个集群的健康状况,节点管理器作为代理任务来监控和管理本节点的健康状况。
- 管理每个已启动的容器的整个生命周期
- 每个节点的日志管理
- 运行各种YARN应用程序使用的辅助服务。
资源管理器:核心是个调度,当多个应用程序竞争使用集群资源的时候,它来负责资源的调度,确保集群资源的优化合理使用。资源管理有一个插件化的调度器,该调度器按程序队列和集群的处理能力,负责为正在运行的多个应用程序分配其所需的集群资源。
应用程序管理器:是一个特性的框架函数库实例,同资源管理器协调沟通资源,并通过节点管理器来获取这些系统资源,然后执行任务。可以提高拓展性并且框架更加的通用。
YARN的运行图
HDFS具有的高可用性
通过HDFS的系统介绍,在Hadoop1.x的系统中,名称节点会引发系统单点故障,如果运行名称节点的服务器出现故障了,那么整个集群都会处于不可用的状态,除非名称节点在另外一台服务器上重新启动。
Hadoop2.x引入了高可用名称节点的概念,高可用名称节点的背后核心思想是使用两个相同的名称节点,一个处于活动模式,另外一个处于待机模式,处于活动模式的名称节点对系统提供服务,处在待机模式的名称节点需要实时同步活动名称节点的数据,一旦活动名称节点当机,系统可以快速的进行故障切换。在当前的设计中,为了这个目的,两个名称节点必须共享同一个存储设备(通过NFS),活动名称节点的任何修改都会记录到共享存储设备当中的edits日志文件中。待机名称节点将这些修改应用到自己的名称空间中红,一旦活动名称节点发生故障,待机名称节点会确保edits文件中的所有数据都被应用,并接管活动名称节点的职责。
认识Hadoop框架
安装类型
- 单机模式:适合调试,这种模式下,Hadoop系统的所有程序运行在一个单独的JVM中,从系统性能的角度看,这种性能最差,当时开发过程是最高效的。
- 伪分布式集群模式 :所有守护进程分别运行在不同的Java进程中,这样的运行模式用来模拟集群环境。
- 多节点集群模式:从系统逻辑上,其系统运行情况与伪分布模式是一致的。
具体的安装教程可以查看厦门大数据实验室写的教程:Hadoop安装教程
一个MapReduce程序的组成
- Java程序客户机:一个Java程序,由集群中的一个客户端节点(由集群中的一个数据节点来充当,该节点是集群中的一台机器,并且有权限访问Hadoop)
- 自定义Mapper类:这个类是一个用户自定义类,这个类的实例会在远程任务节点上执行,这些任务节点往往与用来提交作业程序的客户端节点不同。
- 自定义Reduce类:与Mapper类一样,如果不是伪分布式下运行,这个类的实例会在远程任务节点上执行。
- 客户端函数库:客户端函数库不同Hadoop系统的标准函数库,这个函数库是在客户端运行期间使用的。
- 远程函数库:这个函数库是用户自定义Mapper和Reduce类所需要的。
- Java程序档案文件:Java程序以JAR文件的形式打包,还包括客户端Java类,Mapper类和Reducer类用到的其他自定义依赖类。
第一个Hadoop程序
使用旧API编写wordcount
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
public class WordCount{
public static class MyMapper extends MapReduceBase implements Mapper<LongWritable,Text,Text,IntWritable>{
public void map(LongWritable key,Text value,OutputCollector<Text,IntWritable>output,Reporter reporter)throws IOException{
output.collect(new Text(value.toString()),new IntWritable(1));
}
}
public static class MyReducer extends MapReduceBase implements Reducer<Text,IntWritable,Text,IntWritable>{
public void reduce(Text key,Iterator<IntWritable>values,OutputCollector<Text,IntWritable>output,Reporter reporter) throws IOException{
int sum=0;
while(values.hasNext()){
sum+=values.next().get();
}
output.collect(key,new IntWritable(sum));
}
}
public static void main(String[] args) throws Exception{
JobConf conf=new JobConf(WordCount.class);
conf.setJobName("wordcount");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(MyMapper.class);
conf.setCombinerClass(MyReducer.class);
conf.setReducerClass(MyReducer.class);
conf.setNumReduceTasks(1);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf,new Path(args[0]));
FileOutputFormat.setOutputPath(conf,new Path(args[1]));
JobClient.runJob(conf);
}
}
使用新API编写wordcount
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class WordTest{
public static class MyMapper extends Mapper<LongWritable,Text,Text,IntWritable>{
public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{
String w=value.toString();
context.write(new Text(w),new IntWritable(1));
}
}
public static class MyReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
public void reduce(Text key,Iterable<IntWritable>values,Context context) throws IOException,InterruptedException{
int sum=0;
for(IntWritable val:values){
sum+=val.get();
}
context.write(key,new IntWritable(sum));
}
}
public static void main(String[] args)throws Exception{
Job job=Job.getInstance(new Configuration());
job.setJarByClass(WordTest.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.setInputPaths(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
boolean status=job.waitForCompletion(true);
if(status){
System.exit(0);
}
else{
System.exit(1);
}
}
}
Hadoop系统管理
Hadoop集群中的每台计算节点都有自己的一组配置文件,有两种主要类型的配置文件:-default.xml和-site.xml,site中配置项覆盖default中相同的配置项。
- core-default.xml ——默认的核心Hadoop属性配置文件,位于JAR文件中:hadoop-common-*.jar
- hdfs-default.xml ——默认的HDFS属性配置文件,位于JAR文件中:hadoop-hdfs-*.jar
- mapred-default.xml ——默认的MapReduce属性配置文件,位于JAR文件中:hadoop-mapreduce-client-core-*.jar
- yarn-default.xml ——默认的Yarn属性配置文件,位于JAR文件中:hadoop-yarn-common-*.jar
- core-site.xml——特定的通用Hadoop属性配置文件,会覆盖core-default.xml文件中的相同配置项。
- hdfs-site.xml——特定的通用HDFS属性配置文件,会覆盖hdfs-default.xml中的相同配置项。
- mapred-site.xml——特定的MapReduce配置屋内按,会覆盖mapred-default.xml中相同配置项。
- yarn.xml——特定的YARN属性配置文件,会覆盖mapred-default.xml中相同的配置项。
配置Hadoop守护进程
- hadoop-env.sh
- yarn-env.sh
- mapred-env.sh
负责设置一下属性:
- Java主目录
- 用于各种守护进程的JVM选项