9.1hadoop 内置计数器、自定义枚举计数器、Streaming计数器

1.1  计数器

计数器的作用是用来统计数量的,用于记录特定事件的次数,分为内置计数器、自定义java枚举计数器、自定义Stream计数器三大类。用于质量分析,或应用级统计。分析计数器的值比分析一堆日志更高效。


计数器名称


计数器介绍


内置计数器


Hadoop自带的计数器,有特定的计数器名称。例如统计输入、输出的记录数量,输入输出的字节数。


自定义java枚举计数器


用户自定义的枚举型计数器,用于统计用户的特殊要求的计数器,例如统计记录中无效记录的数量。


自定义Streaming计数器


通过向标准输出流发送特定格式的信息,来增加指定计数器的数值。Streaming是hadoop工具,用于执行非java的map和reduce作业。

1.1.1         内置计数器


组别


名称/类别


参考


MapReduce任务计数器


org.apache.hadoop.mapreduce.TaskCounter


Map和reduce的任务统计


文件系统计数器


org.apache.hadoop.mapreduce.FiIeSystemCounter


文件系统读取写入统计


FiIeInputFormat计数器


org.apache.hadoop.mapreduce.lib.input.FilelnputFormatCounter


Map任务通过FilelnputForma读取数据的数量


FiIeOutputFormat计数器


org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter


map任务(针对仅含map的作业)或者reduce任务通过FileOutputFormat写的字节数


作业计数器


org.apache.hadoop.mapreduce.JobCounter


从作业的维度统计的数值

(1)   任务计数器

MAP_INPUT_RECORDS计数器统计map任务输入记录的总数,一个作业的所有map任务进行聚集,定期发送给application master,全量发送更新。可以统计任务输入、输出的记录数量,输入、输出的字节数,分片(split)的原始字节数(SPLIT_RAW_BYTES)等

表内置的mapreduce任务计数器


计数器名称


说明


map输人的记录数(MAP_INPUT_RECORDS)


作业中所有map已处理的输人记录数。每次RecordReader读到一条记录并将其传给map的map()函数时,该计数器的值递增


分片(split)的原始字节数(SPLIT_RAW_BYTES)


由map读取的输人-分片对象的字节数。这些对象描述分片元数据(文件的位移和长度),而不是分片的数据自身,因此总规模是小的


map输出的记录数(MAP_OUTPUT_RECORDS)


作业中所有map产生的map输出记录数。每次某一个map
的OutputCollector调用collect()方法时,该计数器的值增加


map输出的字节数(MAP_OUTPUT_BYTES)


作业中所有map产生的耒经压缩的输出数据的字节数·每次某一个map的OutputCollector调用collect()方法时,该计数器的值增加


map输出的物化字节数(MAP_OUTPUT_MATERIALIZED_BYTES)


map输出后确实写到磁盘上的字节数;若map输出压缩功能被启用,则会在计数器值上反映出来


combine输人的记录数(COMBINE_INPUT_RECORDS)


作业中所有combiner(如果有)已处理的输人记录数。combiner的迭代器每次读一个值,该计数器的值增加。注意:本计数器代表combiner已经处理的值的个数,并非不同的键组数(后者并无实所意文,因为对于combiner而言,并不要求每个键对应一个组,详情参见2.4.2节和7.3节


combine输出的记录数(COMBINE_OUTPUT_RECORDS)


作业中所有combiner(如果有)已产生的输出记录数。每当一个combiner的OutputCollector调用collect()方法时,该计数器的值增加


reduce输人的组(REDUCE_INPUT_GROUPS)


作业中所有reducer已经处理的不同的码分组的个数。每当某一个reducer的reduce()被调用时,该计数器的值增加


reduce输人的记录数(REDUCE_INPUT_RECORDS)


作业中所有reducer已经处理的输人记录的个数。每当某个reducer的迭代器读一个值时,该计数器的值增加。如果所有reducer已经处理数完所有输人,則该计数器的值与计数器"map输出的记录"的值相同


reduce输出的记录数(REDUCE_OUTPUT_RECORDS)


作业中所有map已经产生的reduce输出记录数。每当某个reducer的OutputCollector调用collect()方法时,该计数器的值增加


reduce经过shuffle的字节数(REDUCE_SHUFFLE_BYTES)


由shume复制到reducer的map输出的字节数


溢出的记录数(SPILLED_RECORDS)


作业中所有map和reduce任务溢出到磁的记录数


CPU毫秒(CPU_MILLISECONDS)


一个任务的总CPU时间,以毫秒为单位,可由/proc/cpuinfo获取


物理内存字节数(PHYSICAL_MEMORY_BYTES)


一个任务所用的物理内存,以字节数为单位,可 由/proc/meminfo获取


虚拟内存字节数(VIRTUAL_MEMORY_BYTES)


一个任务所用虚拟内存的字节数,由/proc/meminfo而‘面获取


有效的堆字节数(COMMITTED_HEAP_BYTES)


在JVM中的总有效内存最(以字节为单位),可由Runtime.
getRuntime().totalMemory()获取


GC运行时间毫秒数(GC_TIME_MILLIS)


在任务执行过程中,垃圾收集器(garbage collection)花费的时间(以毫秒为单位),可由GarbageCollector MXBean.
getCollectionTime()获取


由shuffle传输的map输出数(SHUFFLED_MAPS)


由shume传输到reducer的map输出文件数,详情参见7.3节


失敗的shuffle数(FAILED_SHUFFLE)


shuffle过程中,发生map输出拷贝错误的次数


被合并的map输出数(MERGED_MAP_OUTPUTS)


shuffle过程中,在reduce端合并的map输出文件数


内置的文件系统任务计数器


计数器名称


说明


文件系统的读字节数(BYTES_READ)


由map任务和reduce任务在各个文件系统中读取的字节数,各个文件系统分别对应一个计数器,文件系统可以是ocal、
HDFS、S3等


文件系统的写字节数(BYTES_WRITTEN)


由map任务和reduce任务在各个文件系统中写的字节数


文件系统读操作的数量(READ_OPS)


由map任务和reduce任务在各个文件系统中进行的读操作的数量(例如,open操作,filestatus操作)


文件系统大规模读操作的数最(LARGE_READ_OPS)


由map和reduce任务在各个文件系统中进行的大规模读操作(例如,对于一个大容量目录进行list操作)的数


文件系统写操作的数最(WRITE_OPS)


由map任务和reduce任务在各个文件系统中进行的写操作的数量(例如,create操作,append操作)


内置的FilelnputFormat计数器


计数器名称


说明


读取的字节数(BYTES_READ)


由map任务通过FilelnputFormat读取的字节数


内置的FileOutputFormat任务计数器


计数器名称


说明


写的字节数(BYTES_WRITTEN)


由map任务(针对仅含map的作业)或者reduce任务通过FileOutputFormat写的字节数

(2)   作业计数器

作业计数器有application master维护,作业级别统计,值不会随着任务的执行而变化,例如TOTAL_LAUNCHED_MAPS统计作业执行的任务数。


计数器名称


说明


启用的map任务数(TOTAL_LAUNCHED_MAPS)


启动的map任务数,包括以“推测执行”方式启动的任务,详情参见7.4.2节


启用的reduce任务数(TOTAL_LAUNCHED_REDUCES)


启动的reduce任务数,包括以“推测执行”方式启动的任务


启用的uber任务数(TOTAL_LAIÆHED_UBERTASKS)


启用的uber任务数,详情参见7.1节


uber任务中的map数(NUM_UBER_SUBMAPS)


在uber任务中的map数


Uber任务中的reduce数(NUM_UBER_SUBREDUCES)


在任务中的reduce数


失败的map任务数(NUM_FAILED_MAPS)


失败的map任务数,用户可以参见7.2.1节对任务失败的讨论,了解失败原因


失败的reduce任务数(NUM_FAILED_REDUCES)


失败的reduce任务数


失败的uber任务数(NIN_FAILED_UBERTASKS)


失败的uber任务数


被中止的map任务数(NUM_KILLED_MAPS)


被中止的map任务数,可以参见7.2.1节对任务失败的讨论,了解中止原因


被中止的reduce任务数(NW_KILLED_REDUCES)


被中止的reduce任务数


数据本地化的map任务数(DATA_LOCAL_MAPS)


与输人数据在同一节点上的map任务数


机架本地化的map任务数(RACK_LOCAL_MAPS)


与输人数据在同一机架范围内但不在同一节点上的map任务数


其他本地化的map任务数(OTHER_LOCAL_MAPS)


与输人数据不在同一机架范围内的map任务数。山于机架之间的带宽资源相对较少,Hadoop会尽量让map任务靠近输人数据执行,因此该计数器值一般比较小。详情参见图2-2


map任务的总运行时间(MILLIS_MAPS)


map任务的总运行时间,单位毫秒。包括以推测执行方式启动的任务。可参见相关的度量内核和内存使用的计数器(VCORES_MILLIS_MAPS和MB_MILLIS_MAPS)


reduce任务的总运行时间(MILLIS_REDUCES)


reduce任务的总运行时间,单位毫秒。包括以推滌执行方式启动的任务。可参见相关的度量内核和内存使用的计数器(VCORES _MILLIS_REDUCES和MB_MILLIS_REDUCES)

1.1.2        
自定义java计数器

计数器由java枚举类型来定义,以便进行分组,枚举名称即为组名,字段即为计数器,计数器为全局的,mapreduce框架跨所有map和reduce聚集这些计数器。

import java.io.IOException;

import java.util.Iterator;

import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.io.*;

import org.apache.hadoop.mapred.JobClient;

import org.apache.hadoop.mapred.JobConf;

import
org.apache.hadoop.mapred.MapReduceBase;

import org.apache.hadoop.mapred.Mapper;

import
org.apache.hadoop.mapred.OutputCollector;

import org.apache.hadoop.mapred.Reducer;

import org.apache.hadoop.mapred.Reporter;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;

//统计最高气温的作业。也统计气温值缺少的记录,不规范的记录

public class MaxTemperatureWithCounters
extends Configured implements Tool {

//自定义计数器

enum Temperature {

              MiSSING, //统计气温缺失的记录

MALFORMED//统计不规则的记录

       }

//定义maper

static
class MaxTemeratureMapperWithCounters extends MapReduceBase implements

Mapper<LongWritable,
Text, Text, IntWritable> {

private
NcdcRecordParser parser = new NcdcRecordParser();

@Override

public
void map(LongWritable key, Text value,

OutputCollector<Text,
IntWritable> output, Reporter reporter)

throws
IOException {

parser.parse(value);

if
(parser.isValidTemperature()) {

int
airTemperature = parser.getAirTemperature();

output.collect(new
Text(parser.getYear()), new IntWritable(

airTemperature));

}
else if (parser.isMa1formedTemperature()) {

//增加计数器的值                  //context.getCounter(Temperature.MALFORMED).increment(1);

                            // Reporter是MapReduce提供给应用程序的工具。可使用Reporter中的方法报告完成进度(progress)、设定状态消息(setStatus)以及更新计数器(incrCounter)

reporter.incrCounter(Temperature.MALFORMED, 1);

}
else if (parser.IsMissingTemperature()) {

//context.getCounter(Temperature.MISSING).increment(1);

                            reporter.incrCounter(Temperature.
MISSING, 1);

}

//动态计数器     context.getCounter(“TemperatureQuality”,parse.getQuality()).increment(1);

}

}

//定义reduce

static
class MaxTemperatureReduceWithCounters extends MapReduceBase implements

Reducer<Text,
IntWritable, Text, IntWritable> {

public
void reduce(Text key, Iterator<IntWritable> values,

OutputCollector<Text,
IntWritable> output, Reporter reporter)

throws
IOException {

int
maxValue = Integer.MIN_VALUE;

while
(values.hasNext()) {

maxValue
= Math.max(maxValue, values.next().get());

}

output.collect(key,
new IntWritable(maxValue));

}

}

@Override

public
int run(String[] args) throws Exception {

args
= new String[] { "/test/input/t", "/test/output/t" }; // 给定输入输出路径

JobConf
conf = JobBuilder.parseInputAndOutput(this, getConf(), args);

if
(conf == null) {

return
-1;

}

conf.setOutputKeyClass(Text.class);

conf.setOutputValueClass(IntWritable.class);

conf.setMapperClass(MaxTemeratureMapperWithCounters.class);

conf.setCombinerClass(MaxTemperatureReduceWithCounters.class);

conf.setReducerClass(MaxTemperatureReduceWithCounters.class);

JobClient.runJob(conf);

return
0;

}

public
static void main(String[] args) throws Exception {

int
exitCode = ToolRunner.run(new MaxTemperatureWithCounters(), args);

System.exit(exitCode);

}

}

执行任务,-counters参数,会输出所有计数器的值

hadoop
jar Hadoop-examples.jar MaxTemperatureWithCounter input/ncdc/all output -counters

(1)动态计数器   

动态计数器不像枚举型计数器需要提前定义组名和计数器类型,而是通过字符串名称动态的创建计数器。枚举类型计数器传入枚举类型也要转成String,所以两种方法时等价的,枚举型简单安全。

context.getCounter(String
groupName,String counterName);

context.getCounter(“TemperatureQuality”,parse.getQuality()).increment(1);

(2)获取计数器

或者用javaAPI获取计数器值。根据配置信息创建cluster对象,根据jobid获取job,获取job的计数器,根据类路径获取计数器的值,counters.findCounter(MaxtemperatureCounters.Temperature.MISSING).getValue();方法。

public
class MissingTemperatureFields extends Configured implement Tool{

@override

public
int run(String[] args)throws Exception{

if(args.length()!=1)

{

return -1;

}

String jobID=args[0];

//根据配置信息创建cluster对象

Cluster cluster =new
Cluster(getConf());

//根据jobid获取job

Job job=cluster.getJob(JobID.forName(jobID));

if(job==null)

{

System.err.printf(“NO
job with ID %s”,jobID);

return
-1;

}

if(!job.isComplete())

{

System.err.printf(“job
ID %s is not complete”,jobID);

return
-1;

}

获取job的计数器,

Counters counters=job.getCounters();

//根据路径获取计数器的值

long missing=counters.findCounter(MaxtemperatureCounters.Temperature.MISSING).getValue();

long total=counters.finCounter(TaskCounter.MAP_INPUT_RECORDS).getValue();

//计算任务的失败率

System.out.printf(“Records missing present %.2f/%%\n”,100.0*missing/total);

}

public
static void main(String[] args)throw Exception{

int
exitCode=ToolRunner.run(new MissingTemperatureFields(),args);

System.exit(exitCode);

}

}

执行hadoop任务,传入jobid参数

%hadoop
jar hadoop-example.jar MissingTempratureFields job_11223131_0007

1.1.3        
用户自定义的Streaming计数器

Hadoop
streaming是hadoop的一个工具,用于运行非java的maper或reducer作业,例如maper和reducer是C++编写的可执行程序或者脚本文件。使用Streaming的mapreduce程序可以像保准错误流发送特殊格式的信息,增加计数器的值格式如下:reporter:counter:group.counter,amount

Python实例如下

sys.stderr.write(“reporter:counter:Temperature,Missing,1
\n”);

状态信息发送格式如下

sys.stderr.write(“reporter:status:message”);

1.1.4        
获取计数器的方法汇总

1)web界面查看计数器值

2)命令行mapred job –counter查看计数器的值;

3)动态计数器用context获取,context.getCounter (“TemperatureQuality”,parse.getQuality()).increment(1);

4)用context的getCounter方法或者用reportor的incrCounter方法context.getCounter(Temperature.MISSING).increment(1);reporter.incrCounter(Temperature.
MISSING, 1);

5)Streaming程序,即非java的mapreduce程序,通过向标准输出发送固定格式的数据来增加计数器的值。sys.stderr.write(“reporter:counter:Temperature,Missing,1
\n”);

6)或者用javaAPI获取计数器值。根据配置信息创建cluster对象,根据jobid获取job,获取job的计数器,根据类路径获取计数器的值,counters.findCounter(MaxtemperatureCounters.Temperature.MISSING).getValue();

自己开发了一个股票智能分析软件,功能很强大,需要的点击下面的链接获取:

https://www.cnblogs.com/bclshuai/p/11380657.html

原文地址:https://www.cnblogs.com/bclshuai/p/12297802.html

时间: 2024-10-06 00:21:15

9.1hadoop 内置计数器、自定义枚举计数器、Streaming计数器的相关文章

6.python模块(导入,内置,自定义,开源)

一.模块 1.模块简介 模块是一个包含所有你定义的函数和变量的文件,其后缀名是.py.模块可以被别的程序引入,以使用该模块中的函数等功能.这也是使用python标准库的方法. 类似于函数式编程和面向过程编程,函数式编程则完成一个功能,其他代码用来调用即可,提供了代码的重用性和代码间的耦合.而对于一个复杂的功能来,可能需要多个函数才能完成(函数又可以在不同的.py文件中),n个 .py 文件组成的代码集合就称为模块. 2.模块的引入 在Python中用关键字import来引入某个模块,比如要引用模

13t天 迭代器,生成器,内置函数

上周复习: 函数的递归调用 函数调用时函数嵌套调用的一种特殊形式 函数在调用时,直接或间接调用了自身,就是梯归调用. 直接调用自身 def f1():   print('from f1')   f1()f1() 间接调用函数 def f1():   print('from f1')   f2()?def f2():   print('from f2')   f1()f1() 梯归 递归应该分为两个明确的阶段,回溯与递推. 回溯就是从外向里一层一层递归调用下去,回溯阶段必须要有一个明确地结束条件,

Shiro内置过滤器

Shiro内置过滤器 DefaultFilter 枚举类定义了shiro所有的默认过滤器. package org.apache.shiro.web.filter.mgt; public enum DefaultFilter { anon(AnonymousFilter.class), authc(FormAuthenticationFilter.class), authcBasic(BasicHttpAuthenticationFilter.class), logout(LogoutFilte

Flume-NG内置计数器(监控)源码级分析

Flume的内置监控怎么整?这个问题有很多人问.目前了解到的信息是可以使用Cloudera Manager.Ganglia有图形的监控工具,以及从浏览器获取json串,或者自定义向其他监控系统汇报信息.那监控的信息是什么呢?就是各个组件的统计信息,比如成功接收的Event数量.成功发送的Event数量,处理的Transaction的数量等等.而且不同的组件有不同的Countor来做统计,目前直到1.5版本仍然只对三大组件:source.sink.channel进行统计分别是SourceCount

Java多线程编程6--单例模式与多线程--使用静态内置类、(反)序列化、static代码块、enum枚举数据类实现

前面讲的用DCL可以解决多线程单例模式的非线程安全,虽然看下去十分完美,但还是有一些问题,具体分析看这篇:http://blog.csdn.net/ochangwen/article/details/51348078 当然用其他的办法也能达到同样的效果. 1.使用静态内置类实现单例模式 public class Singleton { /* 私有构造方法,防止被实例化 */ private Singleton() { } /* 此处使用一个内部类来维护单例 */ private static c

.Net——使用.net内置处理程序处理自定义节点Demo

在.net中,因为对不同的节点,都对应着类去对它进行处理,.net里面为了方便,已经内置了一些类供我们使用,使我们在读取配置文件时,不必自己去定义类去处理自己定义的自定义节点. 下面我们写了这样一个配置文件: <?xml version="1.0" encoding="utf-8" ?> <configuration> <configSections> <!--使用IgnoreSection处理自定义节点--> <

浏览器扩展系列————给MSTHML添加内置脚本对象【包括自定义事件】

原文:浏览器扩展系列----给MSTHML添加内置脚本对象[包括自定义事件] 使用场合: 在程序中使用WebBrowser或相关的控件如:axWebBrowser等.打开本地的html文件时,可以在html的脚本中使用自己在.net中定义的类,实现与Internet Explorer server的互操作.此外也可以在充分利用html在设计界面方面高效,简单的同时,也可以实现一些复杂的特性. 实现: Code Code highlighting produced by Actipro CodeH

struts2内置拦截器和自定义拦截器详解(附源码)

一.Struts2内置拦截器 Struts2中内置类许多的拦截器,它们提供了许多Struts2的核心功能和可选的高级特 性.这些内置的拦截器在struts-default.xml中配置.只有配置了拦截器,拦截器才可以正常的工作和运行.Struts 2已经为您提供丰富多样的,功能齐全的拦截器实现.大家可以至struts2的jar包内的struts-default.xml查看关于默认的拦截器与 拦截器链的配置.内置拦截器虽然在struts2中都定义了,但是并不是都起作用的.因为并不是所有拦截器都被加

Mysql函数(内置函数,自定义函数)

简述 SQL:结构化查询语言,是一门编程语言,是用于管理数据库的编程语言. 元素:数据,数据类型,变量,函数,流程控制,运算符,注释. 注释: 行: # –[空格] 块: /* */ select * from swpu_stu #where id=2; ; select * from swpu_stu -- where id=2; ; 结束符: select * from swpu_stu where id=2\g select * from swpu_stu where id=2\G 可以使