1. MapReduce与mysql连接总结
应用场景:
在项目中会遇到输入结果集很大,但是输出结果很小,比如一些 pv、uv 数据,然后为了实时查询的需求,或者一些 OLAP 的需求,我们需要 mapreduce 与 mysql 进行数据的交互,而这些是 hbase 或者 hive 目前亟待改进的地方。
1.从mysql中
读数据:
Hadoop访问关系数据库主要通过一下接口实现的:DBInputFormat类,包所在位置:org.apache.hadoop.mapred.lib.db 中。DBInputFormat 在 Hadoop 应用程序中通过数据库供应商提供的 JDBC接口来与数据库进行交互,并且可以使用标准的 SQL 来读取数据库中的记录。学习DBInputFormat首先必须知道二个条件。
- 在使用 DBInputFormat 之前,必须将要使用的 JDBC 驱动拷贝到分布式系统各个节点的$HADOOP_HOME/lib/目录下。
- MapReduce访问关系数据库时,大量频繁的从MapReduce程序中查询和读取数据,这大大的增加了数据库的访问负载,因此,DBInputFormat接口仅仅适合读取小数据量的数据,而不适合处理数据仓库。要处理数据仓库的方法有:利用数据库的Dump工具将大量待分析的数据输出为文本,并上传的Hdfs中进行处理,处理的方法可参考:http://www.cnblogs.com/liqizhou/archive/2012/05/15/2501835.html
写数据:
往往对于数据处理的结果的数据量一般不会太大,可能适合hadoop直接写入数据库中。hadoop提供了相应的数据库直接输出的计算发结果。
-
- DBOutFormat: 提供数据库写入接口。
- DBRecordWriter:提供向数据库中写入的数据记录的接口。
- DBConfiguration:提供数据库配置和创建链接的接口
2.Hive常见命令
Hive导出查询内容: INSERT OVERWRITE LOCAL DIRECTORY ‘/tmp/result.txt‘ select id,name from t_test;
hive -e"select id,name from t_test;"> result.txt
连接hive的三种方式:
1.cli 本质上是每个连接都存放一个元数据,各个之间都不相同,不适合做产品的开发和应用
2.JDBC连接的方式,容易被大数据量冲挂,不稳定
3. 直接利用Hive的 Driver class 来直接连接 Driver driver = new Driver(new HiveConf(SessionState.class));
远程连接Hive
hive --service hiveserver -p 50000 &
打开50000端口,然后java就可以使用java连了,将所需的jar包做个标记
HQL结果直接导入mysql
1、首先下载mysql-connector-java jar包。
2、在hive cli端添加必要jar:
add jar /home/hadoop/hive-0.12.0/lib/hive-contrib-0.12.0.jar;
add jar /home/hadoop/hive-0.12.0/lib/mysql-connector-java-5.1.27-bin.jar;
3、给指点方法弄个简称:
CREATE TEMPORARY FUNCTION dboutput AS ‘org.apache.hadoop.hive.contrib.genericudf.example.GenericUDFDBOutput‘;
4、执行:
select dboutput(‘jdbc:mysql://localhost:port/dbname‘,‘db_username‘,‘db_pwd‘,‘INSERT INTO mysql_table(field1,field2,field3) VALUES (6,?,?)‘,substr(field_i,1,10),count(field_j)) from hive_table group by substr(field_i,1,10) limit 10;
问题:
发现总提示找不到org.apache.Hadoop.hive.contrib.genericudf.example.GenericUDFDBOutput
解决办法:
后来经过琢磨才弄明白org.apache.Hadoop.hive.contrib.genericudf.example.GenericUDFDBOutput部分自己要去编写,编写后打成jar包 用add jar添加进去就可以了。
Python连接Hive
import sys from hive_service import ThriftHive from hive_service.ttypes import HiveServerException from thrift import Thrift from thrift.transport import TSocket from thrift.transport import TTransport from thrift.protocol import TBinaryProtocol try: transport = TSocket.TSocket(‘192.168.30.201‘, 10000) transport = TTransport.TBufferedTransport(transport) protocol = TBinaryProtocol.TBinaryProtocol(transport) client = ThriftHive.Client(protocol) transport.open() hql = ‘‘‘CREATE TABLE people(a STRING, b INT, c DOUBLE) row format delimited fields terminated by ‘,‘ ‘‘‘ print hql client.execute(hql) client.execute("LOAD DATA LOCAL INPATH ‘/home/diver/data.txt‘ INTO TABLE people") #client.execute("SELECT * FROM people") #while (1): # row = client.fetchOne() # if (row == None): # break # print row client.execute("SELECT count(*) FROM people") print client.fetchAll() transport.close() except Thrift.TException, tx: print ‘%s‘ % (tx.message)
#!/usr/bin/python #-*-coding:UTF-8 -*- import sys import os import string import re import MySQLdb from hive_service import ThriftHive from hive_service.ttypes import HiveServerException from thrift import Thrift from thrift.transport import TSocket from thrift.transport import TTransport from thrift.protocol import TBinaryProtocol def hiveExe(hsql,dbname): #定义hive查询函数 try: transport = TSocket.TSocket(‘192.168.10.1‘, 10000) transport = TTransport.TBufferedTransport(transport) protocol = TBinaryProtocol.TBinaryProtocol(transport) client = ThriftHive.Client(protocol) transport.open() client.execute(‘ADD jar /opt/modules/hive/hive-0.7.1/lib/hive-contrib-0.7.1.jar‘) client.execute("use "+dbname) row = client.fetchOne() #使用库名,只需一次fetch,用fetchOne client.execute(hsql) return client.fetchAll() #查询所有数据,用fetchAll() transport.close() except Thrift.TException, tx: print ‘%s‘ % (tx.message) def mysqlExe(sql): try: conn = MySQLdb.connect(user="test",passwd="test123",host="127.0.0.1",db="active2_ip",port=5029) except Exception,data: print "Could not connect to MySQL server.:",data try: cursor = conn.cursor() cursor.execute(sql) return row cursor.commit() cursor.close() conn.close() except Exception,data: print "Could not Fetch anything:",data dbname = "active2" date = os.popen("date -d ‘1 day ago‘ +%Y%m%d").read().strip() #shell方式取昨天日期,读取并去前后\n date.close() sql = "create table IF NOT EXISTS "+dbname+"_group_ip_"+date+" like "+dbname+"_group_ip;load data infile ‘/tmp/"+dbname+"_"+date+".csv‘ into table "+dbname+"_group_ip_"+date+" FIELDS TERMINATED BY ‘,‘" #以模板表创建日期表,并load data到该表中 hsql = "insert overwrite local directory ‘/tmp/"+dbname+"_"+date+"‘ select count(version) as vc,stat_hour,type,version,province,city,isp from "+dbname+"_"+date+" group by province,city,version,type,stat_hour,isp" #hive查询,并将查询结果导出到本地/tmp/active2_20111129目录下,可能生成多个文件 hiveExe(hsql, dbname) #执行查询 os.system("sudo cat /tmp/"+dbname+"_"+date+"/* > /tmp/tmplog ") #将多个文件通过shell合并为一个文件tmplog file1 = open("/tmp/tmplog", ‘r‘) #打开合并后的临时文件 file2 = open("/tmp/"+dbname+"_"+date+".csv",‘w‘) #打开另一个文件,做文字替换。因为hive导出结果,其分隔符为特殊字符。所以需要做替换,格式为csv,故用逗号分隔 sep = ‘,‘ for line in file1: tmp = line[:-1].split(‘\x01‘) #hive导出文件分隔符为ascii中的001,\x01是16进制,但其实也就是十进制的1 replace = sep.join(tmp) file2.write(replace+"\n") file1.close() file2.close() os.system("sudo rm -f /tmp/tmplog") #删除临时的tmplog mysqlExe(sql) #执行mysql查询,创建表和加载数据。 os.system("sudo rm -f /tmp/"+dbname+"_"+date)
Thrift是Apache的一个开源的跨语言服务开发框架,它提供了一个代码生成引擎来构建服务,支持C++,Java,Python,PHP,Ruby,Erlang,Perl,Haskell,C#,Cocoa,JavaScript,Node.js,Smalltalk,OCaml,Delphi等多种编程语言。
一般来说,使用Thrift来开发应用程序,主要建立在两种场景下:
- 第一,在我们开发过程中,一个比较大的项目需要多个团队进行协作,而每个团队的成员在编程技术方面的技能可能不一定相同,为了实现这种跨语言的开发氛围,使用Thrift来构建服务
- 第二,企业之间合作,在业务上不可避免出现跨语言的编程环境,使用Thrift可以达到类似Web Services的跨平台的特性
Python就是用Thrift来连接Hive的
#!/bin/sh # 一键安装thrift-0.9.0的脚本 # thrift依赖boost、openssl和libevent # 下面的变量值可以根据实现做修改 PROJECT_HOME=$HOME/iflow # 项目源码主目录 # thrift及依赖的第三方库源码包存放目录和安装目录, # 一键脚本要和第三方库源码包放在同一个目录下 THIRD_PARTY_HOME=$PROJECT_HOME/third-party boost=boost_1_52_0 openssl=openssl-1.0.1c libevent=libevent-2.0.19-stable thrift=thrift-0.9.0 # # 安装boost # printf "n33[0;32;34minstalling boost33[mn" tar xzf $boost.tar.gz cd $boost ./bootstrap.sh if test $? -ne 0; then exit 1 fi ./b2 install --prefix=$THIRD_PARTY_HOME/boost printf "n33[0;32;34m./b2 install return $?33[mn" cd - # # 安装openssl # printf "n33[0;32;34minstalling openssl33[mn" tar xzf $openssl.tar.gz cd $openssl ./config --prefix=$THIRD_PARTY_HOME/openssl shared threads if test $? -ne 0; then exit 1 fi make if test $? -ne 0; then exit 1 fi make install cd - # # 安装libevent # printf "n33[0;32;34minstalling libevent33[mn" tar xzf $libevent.tar.gz cd $libevent ./configure --prefix=$THIRD_PARTY_HOME/libevent if test $? -ne 0; then exit 1 fi make if test $? -ne 0; then exit 1 fi make install cd - # # 安装thrift # printf "n33[0;32;34minstalling thrift33[mn" tar xzf $thrift.tar.gz cd $thrift # 按照常规的configure,使用--with-openssl,会遇到 # “Error: libcrypto required.”错误,这里使用CPPFLAGS和LDFLAGS替代 ./configure --prefix=$THIRD_PARTY_HOME/thrift --with-boost=$THIRD_PARTY_HOME/boost --with-libevent=$THIRD_PARTY_HOME/libevent CPPFLAGS="-I$THIRD_PARTY_HOME/openssl/include" LDFLAGS="-ldl -L$THIRD_PARTY_HOME/openssl/lib" --with-qt4=no --with-c_glib=no --with-csharp=no --with-java=no --with-erlang=no --with-python=no --with-perl=no --with-ruby=no --with-haskell=no --with-go=no --with-d=no if test $? -ne 0; then exit 1 fi # 完成上述修改后,configure可以成功了,但还需要下面修改, # 否则make时会报malloc未声明 sed -i -e ‘s!#define HAVE_MALLOC 0!#define HAVE_MALLOC 1!‘ config.h sed -i -e ‘s!#define HAVE_REALLOC 0!#define HAVE_REALLOC 1!‘ config.h sed -i -e ‘s!#define malloc rpl_malloc!/*#define malloc rpl_malloc*/!‘ config.h sed -i -e ‘s!#define realloc rpl_realloc!/*#define realloc rpl_realloc*/!‘ config.h make if test $? -ne 0; then exit 1 fi make install cd - # 安装成功提示一下 printf "n33[0;32;34minstall SUCCESS33[mn"
hive的结果导入到mysql报错 参考 Hiveserver和Hiveserver2的区别
1、sqoop依赖zookeeper所以必须配置ZOOKEEPER_HOME到环境变量中。
2、sqoop-1.2.0-CDH3B4依赖hadoop-core-0.20.2-CDH3B4.jar所以你需要下载hadoop-0.20.2-CDH3B4.tar.gz解压缩后将hadoop-0.20.2-CDH3B4/hadoop-core-0.20.2-CDH3B4.jar复制到sqoop-1.2.0-CDH3B4/lib中。
3、sqoop导入mysql数据运行过程中依赖mysql-connector-java-.jar所以你需要下载mysql-connector-java-.jar并复制到sqoop-1.2.0-CDH3B4/lib中。
利用udf函数将Hive统计结果直接插入到MySQL
http://www.linuxidc.com/Linux/2013-04/82878.htm
Python脚本将Hive的结果保存到MySQL
http://pslff.diandian.com/post ... 08648