hive使用python脚本导致java.io.IOException: Broken pipe异常退出

反垃圾rd那边有一个hql,在执行过程中出现错误退出,报java.io.IOException: Broken pipe异常,hql中使用到了python脚本,hql和python脚本近期没有人改过,在10.1号时还运行正常,但是在10.4号之后运行就老是出现相同的错误,而且错误出现在stage-2的reduce阶段,gateway上面的错误提示如下:

2014-10-10 15:05:32,724 Stage-2 map = 100%,  reduce = 100%
Ended Job = job_201406171104_4019895 with errors
FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.MapRedTask

jobtracker页面job报错信息:

2014-10-10 15:00:29,614 WARN org.apache.hadoop.mapred.Child: Error running child
java.lang.RuntimeException: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row (tag=0) {"key":{"reducesinkkey0":"1000390355","reducesinkkey1":"14"},"value":{"_col0":"1000390355","_col1":25,"_col2":"Infinity","_col3":"14","_col4":17},"alias":0}
	at org.apache.hadoop.hive.ql.exec.ExecReducer.reduce(ExecReducer.java:268)
	at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:518)
	at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:419)
	at org.apache.hadoop.mapred.Child$4.run(Child.java:259)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:396)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1061)
	at org.apache.hadoop.mapred.Child.main(Child.java:253)
Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row (tag=0) {"key":{"reducesinkkey0":"1000390355","reducesinkkey1":"14"},"value":{"_col0":"1000390355","_col1":25,"_col2":"Infinity","_col3":"14","_col4":17},"alias":0}
	at org.apache.hadoop.hive.ql.exec.ExecReducer.reduce(ExecReducer.java:256)
	... 7 more
Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: java.io.IOException: Broken pipe
	at org.apache.hadoop.hive.ql.exec.ScriptOperator.processOp(ScriptOperator.java:348)
	at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:471)
	at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:744)
	at org.apache.hadoop.hive.ql.exec.SelectOperator.processOp(SelectOperator.java:84)
	at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:471)
	at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:744)
	at org.apache.hadoop.hive.ql.exec.ExtractOperator.processOp(ExtractOperator.java:45)
	at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:471)
	at org.apache.hadoop.hive.ql.exec.ExecReducer.reduce(ExecReducer.java:247)
	... 7 more
Caused by: java.io.IOException: Broken pipe
	at java.io.FileOutputStream.writeBytes(Native Method)
	at java.io.FileOutputStream.write(FileOutputStream.java:260)
	at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:65)
	at java.io.BufferedOutputStream.write(BufferedOutputStream.java:109)
	at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:65)
	at java.io.BufferedOutputStream.write(BufferedOutputStream.java:109)
	at java.io.DataOutputStream.write(DataOutputStream.java:90)
	at org.apache.hadoop.hive.ql.exec.TextRecordWriter.write(TextRecordWriter.java:43)
	at org.apache.hadoop.hive.ql.exec.ScriptOperator.processOp(ScriptOperator.java:331)
	... 15 more

stderr logs:

Traceback (most recent call last):
  File "/data10/hadoop/local/taskTracker/liangjun/jobcache/job_201406171104_4019895/attempt_201406171104_4019895_r_000000_0/work/./pranalysis.py", line 86, in <module>
    pranalysis(cols[0],pr,cols[1],cols[4],prnum)
  File "/data10/hadoop/local/taskTracker/liangjun/jobcache/job_201406171104_4019895/attempt_201406171104_4019895_r_000000_0/work/./pranalysis.py", line 60, in pranalysis
    print '%s\t%d\t%d\t%d'%(uid,v[14]-20,type,rank)
TypeError: %d format: a number is required, not float

从以上job的错误信息初步判断,问题原因应该是10.1之后的数据出现问题,导致python脚本执行的时候退出,数据流通道被关闭,而ExecReducer.reduce()方法不知道往python写数据的通道已经因为异常而关闭,还继续往里写数据,这时就会出现java.io.IOException: Broken pipe异常。

以下是分析过程:

1、hql和python

hql内容如下:

add file /usr/home/wbdata_anti/shell/sass_offline/pranalysis.py;
select transform(BS.*) using 'pranalysis.py' as uid,prvalue,trend,prlevel
from
(
select B1.uid,B1.flws,B1.pr,iter,B2.alivefans from tmp_anti_user_pagerank1 B1
join
mds_anti_user_flwpr B2
on B1.uid=B2.uid
where iter>'00' and iter<='14' and dt='lowrlfans20141001'
distribute by uid sort by uid,iter
)BS;

python脚本内容如下:

#!/usr/bin/python
#coding=utf-8
import sys,time
import re,math
from optparse import OptionParser
import ConfigParser

reload(sys)
sys.setdefaultencoding('utf-8')

parser = OptionParser(usage="usage:%prog [optinos] filepath")
parser.add_option("-i", "--iter",action = "store",type = 'string', dest = "iter",  default = '14',
		help="how many iterators" )
(options, args) = parser.parse_args()

def pranalysis(uid,prs,flw,fans,prnum):
	tasc=tdesc=0

	try:
		v=[float(pr)*100000000000 for pr in prs]
		fans=int(fans)
		interval=fans/100
	except:
		#rst=sys.exc_info()
	        #sys.excepthook(rst[0],rst[1],rst[2])
		return
	for i in  range(1,prnum-1)	:
		if i==1:
			if v[i+1]-v[i]>interval and v>fans:	tasc += 1
			elif v[i]-v[i+1]>interval and v[i+1]<fans:	tdesc += 1
			continue
		if v[i+1]-v[i]>interval:	tasc += 1
		elif v[i]-v[i+1]>interval:	tdesc += 1

	# rank indicate the rate between pr and fans. higher rank(big number) mean more possible negative user
	rate=v[prnum-1]/fans
	rank=4
	if rate>3.0: rank=0
	elif rate>2.0: rank=1
	elif rate>1.3: rank=2
	elif rate>0.7: rank=3
	elif rate>0.5: rank=4
	elif rate>0.3: rank=5
	elif rate>0.2: rank=6
	else: rank=7

	# 0 for stable trend. 1 for round trend,  2, for positive user, 3 for negative user.
	type=0
	if tasc>0 and tdesc>0:
		type=1
	elif tasc>0:
		type=2
	elif tdesc>0:
		type=3
	else: 		# tdesc=0 and tasc=0
		type=0
	#if fans<60:
	#	type=0

	print '%s\t%d\t%d\t%d'%(uid,v[14]-20,type,rank)

#format	sort by uid, iter
#uid            follow        pr        iter        fans
#1642909335      919     0.00070398898   04      68399779

prnum=int(options.iter)+1
pr=[0]*prnum
idx=1
lastiter='00'
lastuid=''
for line in sys.stdin:
	line=line.rstrip('\n')
        cols=line.split('\t')
	if len(cols)<5: continue
	if cols[3]>options.iter or cols[3]=='00':	continue
	if cols[3]<=lastiter:
		print '%s\t%d\t%d\t%d'%(lastuid,2,0,7)
		pr=[0]*prnum
		idx=1
	lastiter=cols[3]
	lastuid=cols[0]
	pr[idx]=cols[2]
	idx+=1
	if cols[3]==options.iter:
		pranalysis(cols[0],pr,cols[1],cols[4],prnum)
		pr=[0]*prnum
		lastiter='00'
		idx=1

2、stage-2 reduce阶段的执行计划:

      Reduce Operator Tree:
        Extract
          Select Operator
            expressions:
                  expr: _col0
                  type: string
                  expr: _col1
                  type: bigint
                  expr: _col2
                  type: string
                  expr: _col3
                  type: string
                  expr: _col4
                  type: bigint
            outputColumnNames: _col0, _col1, _col2, _col3, _col4
            Transform Operator
              command: pranalysis.py
              output info:
                  input format: org.apache.hadoop.mapred.TextInputFormat
                  output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
              File Output Operator
                compressed: false
                GlobalTableId: 0
                table:
                    input format: org.apache.hadoop.mapred.TextInputFormat
                    output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat

根据执行计划,可以看出,stage-2 的reduce阶段其实很简单,就是将map阶段拿到的数据使用pranalysis.py脚本进行计算,由5列转换成4列,python输出的时候有数据格式要求:

print '%s\t%d\t%d\t%d'%(uid,v[14]-20,type,rank)

根据执行计划定位到的结果,在结合job的stderr logs信息:

Traceback (most recent call last):
  File "/data10/hadoop/local/taskTracker/liangjun/jobcache/job_201406171104_4019895/attempt_201406171104_4019895_r_000000_0/work/./pranalysis.py", line 86, in <module>
    pranalysis(cols[0],pr,cols[1],cols[4],prnum)
  File "/data10/hadoop/local/taskTracker/liangjun/jobcache/job_201406171104_4019895/attempt_201406171104_4019895_r_000000_0/work/./pranalysis.py", line 60, in pranalysis
    print '%s\t%d\t%d\t%d'%(uid,v[14]-20,type,rank)
TypeError: %d format: a number is required, not float

可以看出,hql确实是在执行python的时候由于数据出现异常,python计算完成之后的有一个数据的格式是float型的,而我们对该数据预期的格式应该是number型的,导致python脚本异常退出,退出的时候关闭了数据流通道,但是ExecReducer.reduce()方法其实是不知道往python写数据的通道已经因为异常而关闭,还继续往里写数据,这时就出现了java.io.IOException:
Broken pipe的异常。

参考:

http://fgh2011.iteye.com/blog/1684544

http://blog.csdn.net/churylin/article/details/11969925

时间: 2024-11-06 10:19:08

hive使用python脚本导致java.io.IOException: Broken pipe异常退出的相关文章

错误号org.apache.catalina.connector.ClientAbortException: java.io.IOException: Broken pipe

在说这个错误之前,我先介绍下背景,我们项目用的是SpringBoot框架,集成Hprose+spring+mybatis,Hprose是什么,可以参考我上篇对Hprose的一个简单介绍.当前项目业务是抓取一个网站近5年的足球篮球的赔率数据.所以这是个按照日期进行的一个大循环.介于Hprose特性,Hprose服务端的处理时间会特别长,这是个重点. 在项目上线抓取数据时,linux环境下,我们的程序出现了这样一个错误,如下: 2016-06-16 12:47:52.190 WARN 10150 -

java.io.IOException: Broken pipe

最近项目虽然已经在正常运行,但是偶尔会有一些不知名的错误冒出来,比如时不时报一个数据库主键重复或者某些时候会有null的异常报出来.看看代码写完能跑起来还只是开始而已,需要不断精进重构,才能让代码运行流畅,今天就发现了另一个没有见过的问题:Broken pipe. 认识broken pipe pipe是管道的意思,管道里面是数据流,通常是从文件或网络套接字读取的数据. 当该管道从另一端突然关闭时,会发生数据突然中断,即是broken. 对于文件File来说,这可能是文件安装在已断开连接的光盘或远

Tomcat报java.io.IOException: Broken pipe错误

Tomcat报java.io.IOException: Broken pipe错误,如下图: 解决方案:我的原因是因为网络策略导致出现该问题,即网络端口未启用或被限制. 原文地址:https://www.cnblogs.com/skysures/p/11479316.html

openTSDB ConnectionManager: Unexpected exception from downstream java.io.IOException: Broken pipe

openTSDB有这种错误: ConnectionManager: Unexpected exception from downstream for [id: 0xf85323a8, /10.65.30.12:3874 => /10.65.150.117:4242] java.io.IOException: Broken pipe at sun.nio.ch.FileDispatcher.write0(Native Method) ~[na:1.6.0_27] at sun.nio.ch.Soc

控制台(Console)报错:java.io.IOException: Broken pipe

控制台(Console)输出: java.io.IOException: Broken pipe at sun.nio.ch.FileDispatcherImpl.write0(Native Method) at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) at sun.nio.ch.IOUtil.wri

java.io.IOException: Stream closed 异常的原因和处理

java.io.IOException: Stream closed 多个线程索引同一个input  stream,当某一个thread在执行完之后,把这个inputstream关闭了:而此时正在从这个input  stream流中读取信息的线程就会抛出  java.io.IOException:  Stream  closed  异常. 终于找到这个异常的根源所在,原来是两个页面同时调用一个jsp,这个jsp中的内建对象out在执行out.close()时发生的异常,也就是当某一个thread

java.net.SocketException: Broken pipe 异常可能的原因

org.apache.catalina.connector.ClientAbortException: java.net.SocketException: Broken pipe at org.apache.catalina.connector.OutputBuffer.realWriteBytes(OutputBuffer.java:410) at org.apache.tomcat.util.buf.ByteChunk.flushBuffer(ByteChunk.java:480) at o

hive执行query语句时提示错误:org.apache.hadoop.ipc.RemoteException: java.io.IOException: java.io.IOException:

hive> select product_id, track_time from trackinfo limit 5; Total MapReduce jobs = 1 Launching Job 1 out of 1 Number of reduce tasks is set to 0 since there's no reduce operator org.apache.hadoop.ipc.RemoteException: java.io.IOException: java.io.IOEx

hive运行query语句时提示错误:org.apache.hadoop.ipc.RemoteException: java.io.IOException: java.io.IOException:

hive> select product_id, track_time from trackinfo limit 5; Total MapReduce jobs = 1 Launching Job 1 out of 1 Number of reduce tasks is set to 0 since there's no reduce operator org.apache.hadoop.ipc.RemoteException: java.io.IOException: java.io.IOEx