Hadoop生产集群的监视——计数器

  可以在Hadoop作业中插桩计数器来分析其整体运作。在程序中定义不同的计数器,分别累计特定事件的发生次数。对于来自同一个作业所有任务的相同计数器,Hadoop会自动对它们进行求和, 以反映整个作业的情况。这些计数器的数值会在JobTracker的Web用户界面中与Hadoop的内部计数器一起显示。

  计数器的典型应用是用来跟踪不同的输入记录类型,特别是跟踪“坏”记录。例如,我们得到的数据集格式为(只显示一部分):

"PATENT","GYEAR","GDATE","APPYEAR","COUNTRY","POSTATE","ASSIGNEE","ASSCODE","CLAIMS","NCLASS","CAT","SUBCAT","CMADE","CRECEIVE","RATIOCIT","GENERAL","ORIGINAL","FWDAPLAG","BCKGTLAG","SELFCTUB","SELFCTLB","SECDUPBD","SECDLWBD"
3070801,1963,1096,,"BE","",,1,,269,6,69,,1,,0,,,,,,,
3070802,1963,1096,,"US","TX",,1,,2,6,63,,0,,,,,,,,,
3070803,1963,1096,,"US","IL",,1,,2,6,63,,9,,0.3704,,,,,,,
3070804,1963,1096,,"US","OH",,1,,2,6,63,,3,,0.6667,,,,,,,
3070805,1963,1096,,"US","CA",,1,,2,6,63,,1,,0,,,,,,,
3070806,1963,1096,,"US","PA",,1,,2,6,63,,0,,,,,,,,,
3070807,1963,1096,,"US","OH",,1,,623,3,39,,3,,0.4444,,,,,,,
3070808,1963,1096,,"US","IA",,1,,623,3,39,,4,,0.375,,,,,,,
3070809,1963,1096,,"US","AZ",,1,,4,6,65,,0,,,,,,,,,
3070810,1963,1096,,"US","IL",,1,,4,6,65,,3,,0.4444,,,,,,,
3070811,1963,1096,,"US","CA",,1,,4,6,65,,8,,0,,,,,,,
3070812,1963,1096,,"US","LA",,1,,4,6,65,,3,,0.4444,,,,,,,
3070813,1963,1096,,"US","NY",,1,,5,6,65,,2,,0,,,,,,,
3070814,1963,1096,,"US","MN",,2,,267,5,59,,2,,0.5,,,,,,,
3070815,1963,1096,,"US","CO",,1,,7,5,59,,1,,0,,,,,,,
3070816,1963,1096,,"US","OK",,1,,114,5,55,,4,,0,,,,,,,
3070817,1963,1096,,"US","RI",,2,,114,5,55,,5,,0.64,,,,,,,
3070818,1963,1096,,"US","IN",,1,,441,6,69,,4,,0.625,,,,,,,
3070819,1963,1096,,"US","TN",,4,,12,6,63,,0,,,,,,,,,
3070820,1963,1096,,"GB","",,2,,12,6,63,,0,,,,,,,,,
3070821,1963,1096,,"US","IL",,2,,15,6,69,,1,,0,,,,,,,
3070822,1963,1096,,"US","NY",,2,,401,1,12,,4,,0.375,,,,,,,
3070823,1963,1096,,"US","MI",,1,,401,1,12,,8,,0.6563,,,,,,,
3070824,1963,1096,,"US","IL",,1,,401,1,12,,5,,0.48,,,,,,,
3070825,1963,1096,,"US","IL",,1,,401,1,12,,7,,0.6531,,,,,,,
3070826,1963,1096,,"US","IA",,1,,401,1,12,,1,,0,,,,,,,
3070827,1963,1096,,"US","CA",,4,,401,1,12,,2,,0.5,,,,,,,
3070828,1963,1096,,"US","CT",,2,,16,5,59,,4,,0.625,,,,,,,
3070829,1963,1096,,"FR","",,3,,16,5,59,,5,,0.48,,,,,,,
3070830,1963,1096,,"US","NH",,2,,16,5,59,,0,,,,,,,,,
3070831,1963,1096,,"US","CT",,2,,16,5,59,,0,,,,,,,,,
3070832,1963,1096,,"US","LA",,2,,452,6,61,,1,,0,,,,,,,
3070833,1963,1096,,"US","LA",,1,,452,6,61,,5,,0,,,,,,,
3070834,1963,1096,,"US","FL",,1,,452,6,61,,3,,0.4444,,,,,,,
3070835,1963,1096,,"US","IL",,2,,264,5,51,,5,,0.64,,,,,,,
3070836,1963,1096,,"US","OK",,2,,264,5,51,,24,,0.7569,,,,,,,
3070837,1963,1096,,"CH","",,3,,264,5,51,,7,,0.6122,,,,,,,
3070838,1963,1096,,"CH","",,5,,425,5,51,,5,,0.48,,,,,,,
3070839,1963,1096,,"US","TN",,2,,425,5,51,,8,,0.4063,,,,,,,
3070840,1963,1096,,"GB","",,3,,425,5,51,,6,,0.7778,,,,,,,
3070841,1963,1096,,"US","OH",,2,,264,5,51,,6,,0.8333,,,,,,,
3070842,1963,1096,,"US","TX",,1,,425,5,51,,1,,0,,,,,,,
3070843,1963,1096,,"US","NY",,2,,425,5,51,,1,,0,,,,,,,
3070844,1963,1096,,"US","OH",,2,,425,5,51,,2,,0,,,,,,,
3070845,1963,1096,,"US","IL",,1,,52,6,69,,3,,0,,,,,,,
3070846,1963,1096,,"US","NY",,2,,425,5,51,,9,,0.7407,,,,,,,

我们想要计算每个国家专利声明的平均数,但是在许多记录中没有声明数。我们的程序会忽略这些记录,知道被忽略记录的数量是有用的。除了满足我们的好奇心,这种插桩让我们理解程序的操作并对其正确性做一些检查。

  通过Reporter.incrCounter( )方法来使用计数器。Reporter对象被传递给map( )和reduce( )方法。以计数器名以及增量为参数来调用incrCounter( ) 。每个不同的事件都有一个独立命名的计数器。当用一个新的计数器名来调用incrCounter( ),这个计数器会被初始化并进行值的累加。

  Reporter.incrCounter( )方法有两种签名:

public void incrCounter(String group, String counter, long amount)
public void incrCounter(Enum key, long amount)

  如下是使用了计数器之后的计算每个国家专利声明平均数的代码段:

package hadoop.in.action;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;

public class AverageByAttribute {

    public static class MapClass extends MapReduceBase implements
            Mapper<LongWritable, Text, Text, Text> {

        static enum ClaimsCounters {
            MISSING, QUOTED
        };

        private Text k = new Text();
        private Text v = new Text();

        @Override
        public void map(LongWritable key, Text value,
                OutputCollector<Text, Text> output, Reporter reporter)
                throws IOException {

            String[] fields = value.toString().split(",", -1);
            String country = fields[4];
            String numClaims = fields[8];
            if (numClaims.length() == 0) {
                reporter.incrCounter(ClaimsCounters.MISSING, 1);
            } else {
                if (numClaims.startsWith("\"")) {
                    reporter.incrCounter(ClaimsCounters.QUOTED, 1);
                } else {
                    k.set(country);
                    v.set(numClaims + ",1");
                    output.collect(k, v);
                }
            }

        }

    }

    public static class CombineClass extends MapReduceBase implements
            Reducer<Text, Text, Text, Text> {

        private Text v = new Text();

        @Override
        public void reduce(Text key, Iterator<Text> values,
                OutputCollector<Text, Text> output, Reporter reporter)
                throws IOException {

            int count = 0;
            double sum = 0;
            while (values.hasNext()) {
                String[] fields = values.next().toString().split(",");
                sum += Double.parseDouble(fields[0]);
                count += Integer.parseInt(fields[1]);
                v.set(sum + "," + count);
                output.collect(key, v);
            }
        }

    }

    public static class ReduceClass extends MapReduceBase implements
            Reducer<Text, Text, Text, DoubleWritable> {

        private DoubleWritable v = new DoubleWritable();

        @Override
        public void reduce(Text key, Iterator<Text> values,
                OutputCollector<Text, DoubleWritable> output, Reporter reporter)
                throws IOException {

            int count = 0;
            double sum = 0;
            while (values.hasNext()) {
                String[] fields = values.next().toString().split(",");
                sum += Double.parseDouble(fields[0]);
                count += Integer.parseInt(fields[1]);
            }
            v.set((double) sum / count);
            output.collect(key, v);
        }

    }

    public static void run() throws IOException {

        Configuration configuration = new Configuration();
        JobConf jobConf = new JobConf(configuration, AverageByAttribute.class);

        String input = "hdfs://localhost:9000/user/hadoop/input/apat63_99.txt";
        String output = "hdfs://localhost:9000/user/hadoop/output";

        // HDFSDao hdfsDao = new HDFSDao(configuration);
        // hdfsDao.rmr(output);

        FileInputFormat.setInputPaths(jobConf, new Path(input));
        FileOutputFormat.setOutputPath(jobConf, new Path(output));

        jobConf.setInputFormat(TextInputFormat.class);
        jobConf.setOutputFormat(TextOutputFormat.class);

        jobConf.setMapOutputKeyClass(Text.class);
        jobConf.setMapOutputValueClass(Text.class);
        jobConf.setOutputKeyClass(Text.class);
        jobConf.setOutputValueClass(DoubleWritable.class);

        jobConf.setMapperClass(MapClass.class);
        jobConf.setCombinerClass(CombineClass.class);
        jobConf.setReducerClass(ReduceClass.class);

        RunningJob runningJob = JobClient.runJob(jobConf);
        while (!runningJob.isComplete()) {
            runningJob.waitForCompletion();
        }

    }

    public static void main(String[] args) throws IOException {

        run();

    }

}

  程序运行后,可以看到定义的计数器和Hadoop内部的计数器都被显示在JobTracker的Web用户界面中:

时间: 2024-10-08 10:42:04

Hadoop生产集群的监视——计数器的相关文章

Hadoop分布式集群搭建完全详细教程

Hadoop分布式集群环境搭建步骤如下 实验环境: 系统:win7 内存:8G(因要开虚拟机,内存建议不低于8G) 硬盘:建议固态 虚拟机:VMware 12 Linux:Centos 7 jdk1.7.0_67 hadoop-2.5.0.tar.gz 1.安装VMware虚拟机环境2.安装Centos操作系统3.修改主机名配置网络4.配置ssh无密码登录5.上传jdk配置环境变量6.上传hadoop配置环境变量7.修改hadoop配置文件8.格式化namenode9.启动hadoop并测试1安

大数据系列之Hadoop分布式集群部署

本节目的:搭建Hadoop分布式集群环境 环境准备 LZ用OS X系统 ,安装两台Linux虚拟机,Linux系统用的是CentOS6.5:Master Ip:10.211.55.3 ,Slave Ip:10.211.55.4 各虚拟机环境配置好Jdk1.8(1.7+即可) 资料准备 hadoop-2.7.3.tar.gz 虚拟机配置步骤 以下操作都在两台虚拟机 root用户下操作,切换至root用户命令 配置Master hostname 为Master ; vi /etc/sysconfi

【转】Twitter Storm: 在生产集群上运行topology

Twitter Storm: 在生产集群上运行topology 发表于 2011 年 10 月 07 日 由 xumingming 作者: xumingming | 可以转载, 但必须以超链接形式标明文章原始出处和作者信息及版权声明网址: http://xumingming.sinaapp.com/185/twitter-storm-在生产集群上运行topology/ 本文翻译自: https://github.com/nathanmarz/storm/wiki/Running-topologi

windows下hadoop的集群分布式部署

下面我们进行说明一下hadoop集群的搭建配置. 本文假设读者具有hadoop单机配置的基础,相同的部分不在重述. 以三台测试机为例搭建一个小集群,三台机器的ip分别为 192.168.200.1;192.168.200.2;192.168.200.3 cygwin,jdk的安装同windows下hadoop的单机伪分布式部署(1),这里略过. 1.配置 hosts 在三台机子的hosts文件中加入如下记录: 192.168.200.1 hadoop1  #master namenode 192

使用Docker在本地搭建Hadoop分布式集群

学习Hadoop集群环境搭建是Hadoop入门必经之路.搭建分布式集群通常有两个办法: 要么找多台机器来部署(常常找不到机器) 或者在本地开多个虚拟机(开销很大,对宿主机器性能要求高,光是安装多个虚拟机系统就得搞半天……). 那么,问题来了! 有没有更有可行性的办法? 提到虚拟化,Docker最近很是火热!不妨拿来在本地做虚拟化,搭建Hadoop的伪分布式集群环境.虽然有点大材小用,但是学习学习,练练手也是极好的. 文章比较长,建议先倒杯水,听我慢慢到来…… 先说一下我的思路吧: 先使用Dock

storm学习之-在生产集群上运行topology

https://storm.apache.org/documentation/Setting-up-a-Storm-cluster.html -官方文档 http://xumingming.sinaapp.com/185/twitter-storm-在生产集群上运行topology/ --徐明明 http://blog.cheyo.net/84.html   --运行一个

Spark入门 - 1 搭建Hadoop分布式集群

安装Ubuntu系统 不论是通过虚拟机方式还是直接在物理机上安装Ubuntu系统,网上都有丰富的教程,此处不再赘述. 为了方便起见,此处设置的机器名最好与书本的设置一致:Master.Slave1和Slave2. 配置root用户登录 这里有一步与普通教程不同.在安装好系统,重启之后,完成了相关配置.可以进行这一步,设置使用root用户登录,方便以后多个服务器相互操作.如下所示. 为了简化权限问题,需要以root用户的身份登录使用Ubuntu系统.而在默认情况下,Ubuntu没有开启root用户

部署Hadoop高性能集群

部署Hadoop高性能集群 服务器概述 1)Hadoop是什么 Hadoop是Lucene创始人Doug Cutting,根据Google的相关内容山寨出来的分布式文件系统和对海量数据进行分析计算的基础框架系统,其中包含MapReduce程序,hdfs系统等. Hadoop包括两大核心,分布式存储系统和分布式计算系统. 2)分布式存储 为什么数据需要存储在分布式的系统中哪,难道单一的计算机存储不了吗,难道现在的几个TB的硬盘装不下这些数据吗?事实上,确实装不下.比如,很多的电信通话记彔就存储在很

基于HBase Hadoop 分布式集群环境下的MapReduce程序开发

HBase分布式集群环境搭建成功后,连续4.5天实验客户端Map/Reduce程序开发,这方面的代码网上多得是,写个测试代码非常容易,可是真正运行起来可说是历经挫折.下面就是我最终调通并让程序在集群上运行起来的一些经验教训. 一.首先说一下我的环境: 1,集群的环境配置请见这篇博文. 2,开发客户机环境:操作系统是CentOS6.5,JDK版本是1.7.0-60,开发工具是Eclipse(原始安装是从google的ADT网站下载的ADT专用开发环境,后来加装了Java企业开发的工具,启动Flas