标签整理

1. MapReduce与mysql连接总结

应用场景:

  在项目中会遇到输入结果集很大,但是输出结果很小,比如一些 pv、uv 数据,然后为了实时查询的需求,或者一些 OLAP 的需求,我们需要 mapreduce 与 mysql 进行数据的交互,而这些是 hbase 或者 hive 目前亟待改进的地方。

1.从mysql中

读数据:

  Hadoop访问关系数据库主要通过一下接口实现的:DBInputFormat类,包所在位置:org.apache.hadoop.mapred.lib.db 中。DBInputFormat 在 Hadoop 应用程序中通过数据库供应商提供的 JDBC接口来与数据库进行交互,并且可以使用标准的 SQL 来读取数据库中的记录。学习DBInputFormat首先必须知道二个条件。

  1. 在使用 DBInputFormat 之前,必须将要使用的 JDBC 驱动拷贝到分布式系统各个节点的$HADOOP_HOME/lib/目录下。
  2. MapReduce访问关系数据库时,大量频繁的从MapReduce程序中查询和读取数据,这大大的增加了数据库的访问负载,因此,DBInputFormat接口仅仅适合读取小数据量的数据,而不适合处理数据仓库。要处理数据仓库的方法有:利用数据库的Dump工具将大量待分析的数据输出为文本,并上传的Hdfs中进行处理,处理的方法可参考:http://www.cnblogs.com/liqizhou/archive/2012/05/15/2501835.html

写数据:

  往往对于数据处理的结果的数据量一般不会太大,可能适合hadoop直接写入数据库中。hadoop提供了相应的数据库直接输出的计算发结果。

    1.   DBOutFormat: 提供数据库写入接口。
    2.   DBRecordWriter:提供向数据库中写入的数据记录的接口。
    3. DBConfiguration:提供数据库配置和创建链接的接口

2.Hive常见命令

  Hive常用的SQL命令操作

  Hive导出查询内容:  INSERT OVERWRITE LOCAL DIRECTORY  ‘/tmp/result.txt‘ select id,name from t_test;

             hive -e"select id,name from t_test;"> result.txt

连接hive的三种方式:

  1.cli  本质上是每个连接都存放一个元数据,各个之间都不相同,不适合做产品的开发和应用

  2.JDBC连接的方式,容易被大数据量冲挂,不稳定

  3. 直接利用Hive的 Driver class 来直接连接  Driver driver = new Driver(new HiveConf(SessionState.class));

远程连接Hive

  hive --service hiveserver -p 50000 &

  打开50000端口,然后java就可以使用java连了,将所需的jar包做个标记

HQL结果直接导入mysql

1、首先下载mysql-connector-java jar包。

2、在hive cli端添加必要jar:

add jar /home/hadoop/hive-0.12.0/lib/hive-contrib-0.12.0.jar;

add jar /home/hadoop/hive-0.12.0/lib/mysql-connector-java-5.1.27-bin.jar;

3、给指点方法弄个简称:

CREATE TEMPORARY FUNCTION dboutput AS ‘org.apache.hadoop.hive.contrib.genericudf.example.GenericUDFDBOutput‘;

4、执行:

select dboutput(‘jdbc:mysql://localhost:port/dbname‘,‘db_username‘,‘db_pwd‘,‘INSERT INTO mysql_table(field1,field2,field3) VALUES (6,?,?)‘,substr(field_i,1,10),count(field_j)) from hive_table group by substr(field_i,1,10) limit 10;

问题:

发现总提示找不到org.apache.Hadoop.hive.contrib.genericudf.example.GenericUDFDBOutput

解决办法:

后来经过琢磨才弄明白org.apache.Hadoop.hive.contrib.genericudf.example.GenericUDFDBOutput部分自己要去编写,编写后打成jar包 用add jar添加进去就可以了。

Python连接Hive
import sys
from hive_service import ThriftHive
from hive_service.ttypes import HiveServerException
from thrift import Thrift
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol

try:
    transport = TSocket.TSocket(‘192.168.30.201‘, 10000)
    transport = TTransport.TBufferedTransport(transport)
    protocol = TBinaryProtocol.TBinaryProtocol(transport)

    client = ThriftHive.Client(protocol)
    transport.open()
    hql = ‘‘‘CREATE TABLE people(a STRING, b INT, c DOUBLE) row format delimited fields terminated by ‘,‘ ‘‘‘
    print hql

    client.execute(hql)
    client.execute("LOAD DATA LOCAL INPATH ‘/home/diver/data.txt‘ INTO TABLE people")
    #client.execute("SELECT * FROM people")
    #while (1):
    #  row = client.fetchOne()
    #  if (row == None):
    #    break
    #  print row
    client.execute("SELECT count(*) FROM people")
    print client.fetchAll()

    transport.close()

except Thrift.TException, tx:
    print ‘%s‘ % (tx.message)

  

#!/usr/bin/python
#-*-coding:UTF-8 -*-
import sys
import os
import string
import re
import MySQLdb

from hive_service import ThriftHive
from hive_service.ttypes import HiveServerException
from thrift import Thrift
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol

def hiveExe(hsql,dbname):
#定义hive查询函数
                try:
                                transport = TSocket.TSocket(‘192.168.10.1‘, 10000)
                                transport = TTransport.TBufferedTransport(transport)
                                protocol = TBinaryProtocol.TBinaryProtocol(transport)

                                client = ThriftHive.Client(protocol)
                                transport.open()

                                client.execute(‘ADD jar /opt/modules/hive/hive-0.7.1/lib/hive-contrib-0.7.1.jar‘)

                                client.execute("use "+dbname)
                                row = client.fetchOne()
                                #使用库名,只需一次fetch,用fetchOne

                                client.execute(hsql)
                                return client.fetchAll()
                                #查询所有数据,用fetchAll()

                                transport.close()

                except Thrift.TException, tx:
                                print ‘%s‘ % (tx.message)

def mysqlExe(sql):
                try:
                                conn = MySQLdb.connect(user="test",passwd="test123",host="127.0.0.1",db="active2_ip",port=5029)
                except Exception,data:
                                print "Could not connect to MySQL server.:",data
                try:
                                cursor = conn.cursor()
                                cursor.execute(sql)
                                return row
                                cursor.commit()
                                cursor.close()
                                conn.close()
                except Exception,data:
                                print "Could not Fetch anything:",data

dbname = "active2"
date = os.popen("date -d ‘1 day ago‘ +%Y%m%d").read().strip()
#shell方式取昨天日期,读取并去前后\n
date.close()

sql = "create table IF NOT EXISTS "+dbname+"_group_ip_"+date+" like "+dbname+"_group_ip;load data infile ‘/tmp/"+dbname+"_"+date+".csv‘ into table "+dbname+"_group_ip_"+date+" FIELDS TERMINATED BY ‘,‘"
#以模板表创建日期表,并load data到该表中

hsql = "insert overwrite local directory ‘/tmp/"+dbname+"_"+date+"‘ select count(version) as vc,stat_hour,type,version,province,city,isp from "+dbname+"_"+date+" group by province,city,version,type,stat_hour,isp"
#hive查询,并将查询结果导出到本地/tmp/active2_20111129目录下,可能生成多个文件

hiveExe(hsql, dbname)
#执行查询

os.system("sudo cat /tmp/"+dbname+"_"+date+"/* > /tmp/tmplog ")
#将多个文件通过shell合并为一个文件tmplog

file1 = open("/tmp/tmplog", ‘r‘)
#打开合并后的临时文件
file2 = open("/tmp/"+dbname+"_"+date+".csv",‘w‘)
#打开另一个文件,做文字替换。因为hive导出结果,其分隔符为特殊字符。所以需要做替换,格式为csv,故用逗号分隔
sep = ‘,‘
for line in file1:
                tmp = line[:-1].split(‘\x01‘)
                #hive导出文件分隔符为ascii中的001,\x01是16进制,但其实也就是十进制的1
                replace = sep.join(tmp)
                file2.write(replace+"\n")

file1.close()
file2.close()

os.system("sudo rm -f /tmp/tmplog")
#删除临时的tmplog

mysqlExe(sql)
#执行mysql查询,创建表和加载数据。
os.system("sudo rm -f /tmp/"+dbname+"_"+date)

 Thrift是Apache的一个开源的跨语言服务开发框架,它提供了一个代码生成引擎来构建服务,支持C++,Java,Python,PHP,Ruby,Erlang,Perl,Haskell,C#,Cocoa,JavaScript,Node.js,Smalltalk,OCaml,Delphi等多种编程语言。

一般来说,使用Thrift来开发应用程序,主要建立在两种场景下:

  • 第一,在我们开发过程中,一个比较大的项目需要多个团队进行协作,而每个团队的成员在编程技术方面的技能可能不一定相同,为了实现这种跨语言的开发氛围,使用Thrift来构建服务
  • 第二,企业之间合作,在业务上不可避免出现跨语言的编程环境,使用Thrift可以达到类似Web Services的跨平台的特性

Python就是用Thrift来连接Hive的

#!/bin/sh
# 一键安装thrift-0.9.0的脚本
# thrift依赖boost、openssl和libevent
# 下面的变量值可以根据实现做修改
PROJECT_HOME=$HOME/iflow # 项目源码主目录
# thrift及依赖的第三方库源码包存放目录和安装目录,
# 一键脚本要和第三方库源码包放在同一个目录下
THIRD_PARTY_HOME=$PROJECT_HOME/third-party
boost=boost_1_52_0
openssl=openssl-1.0.1c
libevent=libevent-2.0.19-stable
thrift=thrift-0.9.0
#
# 安装boost
#
printf "n33[0;32;34minstalling boost33[mn"
tar xzf $boost.tar.gz
cd $boost
./bootstrap.sh
if test $? -ne 0; then
exit 1
fi
./b2 install --prefix=$THIRD_PARTY_HOME/boost
printf "n33[0;32;34m./b2 install return $?33[mn"
cd -
#
# 安装openssl
#
printf "n33[0;32;34minstalling openssl33[mn"
tar xzf $openssl.tar.gz
cd $openssl
./config --prefix=$THIRD_PARTY_HOME/openssl shared threads
if test $? -ne 0; then
exit 1
fi
make
if test $? -ne 0; then
exit 1
fi
make install
cd -
#
# 安装libevent
#
printf "n33[0;32;34minstalling libevent33[mn"
tar xzf $libevent.tar.gz
cd $libevent
./configure --prefix=$THIRD_PARTY_HOME/libevent
if test $? -ne 0; then
exit 1
fi
make
if test $? -ne 0; then
exit 1
fi
make install
cd -
#
# 安装thrift
#
printf "n33[0;32;34minstalling thrift33[mn"
tar xzf $thrift.tar.gz
cd $thrift
# 按照常规的configure,使用--with-openssl,会遇到
# “Error: libcrypto required.”错误,这里使用CPPFLAGS和LDFLAGS替代
./configure --prefix=$THIRD_PARTY_HOME/thrift
           --with-boost=$THIRD_PARTY_HOME/boost
           --with-libevent=$THIRD_PARTY_HOME/libevent
           CPPFLAGS="-I$THIRD_PARTY_HOME/openssl/include"
           LDFLAGS="-ldl -L$THIRD_PARTY_HOME/openssl/lib"
           --with-qt4=no --with-c_glib=no --with-csharp=no
           --with-java=no --with-erlang=no --with-python=no
           --with-perl=no --with-ruby=no --with-haskell=no
           --with-go=no --with-d=no
if test $? -ne 0; then
exit 1
fi
# 完成上述修改后,configure可以成功了,但还需要下面修改,
# 否则make时会报malloc未声明
sed -i -e ‘s!#define HAVE_MALLOC 0!#define HAVE_MALLOC 1!‘ config.h
sed -i -e ‘s!#define HAVE_REALLOC 0!#define HAVE_REALLOC 1!‘ config.h
sed -i -e ‘s!#define malloc rpl_malloc!/*#define malloc rpl_malloc*/!‘ config.h
sed -i -e ‘s!#define realloc rpl_realloc!/*#define realloc rpl_realloc*/!‘ config.h
make
if test $? -ne 0; then
exit 1
fi
make install
cd -
# 安装成功提示一下
printf "n33[0;32;34minstall SUCCESS33[mn"

 hive的结果导入到mysql报错 参考 Hiveserver和Hiveserver2的区别

 1、sqoop依赖zookeeper所以必须配置ZOOKEEPER_HOME到环境变量中。

2、sqoop-1.2.0-CDH3B4依赖hadoop-core-0.20.2-CDH3B4.jar所以你需要下载hadoop-0.20.2-CDH3B4.tar.gz解压缩后将hadoop-0.20.2-CDH3B4/hadoop-core-0.20.2-CDH3B4.jar复制到sqoop-1.2.0-CDH3B4/lib中。

3、sqoop导入mysql数据运行过程中依赖mysql-connector-java-.jar所以你需要下载mysql-connector-java-.jar并复制到sqoop-1.2.0-CDH3B4/lib中。

利用udf函数将Hive统计结果直接插入到MySQL
http://www.linuxidc.com/Linux/2013-04/82878.htm

Python脚本将Hive的结果保存到MySQL
http://pslff.diandian.com/post ... 08648

 

时间: 2024-10-12 17:33:54

标签整理的相关文章

phpcms模板标签整理

{template "content","header"} 调用根目录下phpcms\template\content\header文件 {CHARSET} 字符集 (gbk或者utf-8) {if isset($SEO['title']) && !empty($SEO['title'])} {$SEO['title']}{/if} {$SEO['site_title']} {$SEO['keyword']} {$SEO['description']

HTML常用标签整理

HTML常用标签整理 HTML常用标签整理 HTML转义字符 HTML中注释 元素类型 文本样式相关标签 分区元素 在页面中显示图片 web开发用到的图片格式 像素 链接元素 表格元素 列表元素 表单元素 form的作用 表单使用 富文本输入框可选的第三方工具 表单实现上传图片或者文件 表单实现编号 为控件分组 wireshark抓包工具 按钮元素 iframe标签 details和summary集合的效果 最后几个 参考资料 HTML转义字符 常用转义字符: < < > > 空格

Dede常用标签整理

1.指定显示文章和栏目 指定文章{dede:arclist idlist='9,14,13,15'}<li><a href="[field:arcurl/]">[field:title/]</a></li>{/dede:arclist} 指定栏目 {dede:type typeid='1'}<a href="[field:typelink /]">[field:typename /]</a>{/

HTML5 常用的结构化标签整理

结构化标签优点: 1.方便浏览器处理和识别,提升了网页的质量和语义. 2.减少了大量无意义的div标签,增强代码的可读性. 结构化标签:(header,nav,body,article,section,aside,hgroup,figure,figcaption,footer) <article>定义外部的内容,可以是一篇新的文章 <aside>定义article以外的内容,aside的内容可用作文章的侧边栏 <figure>用于对元素进行组合,使用figcaption

常用meta标签整理

< meta > 元素 概要 标签提供关于HTML文档的元数据.元数据不会显示在页面上,但是对于机器是可读的.它可用于浏览器(如何显示内容或重新加载页面),搜索引擎(关键词),或其他 web 服务. —— W3School 必要属性 属性 值 描述 content some text 定义与http-equiv或name属性相关的元信息 可选属性 属性 值 描述 http-equiv content-type / expire / refresh / set-cookie 把content属性

Html标签整理

行内元素:inline element 1. 也叫内联元素.内嵌元素等:行内元素一般都是基于语义级(semantic)的基本元素,只能容纳文本或其 他内联元素,常见内联元素 “a”: 常见的行内元素: a - 锚点         b - 粗体(不推荐)            br - 换行            em - 强调            i - 斜体            img - 图片            input - 输入框            label - 表格标签

常用HTML标签整理

1.标题标签 <h1>1级标题</h1><h2>2级标题</h2><h3>3级标题</h3><h4>4级标题</h4><h5>5级标题</h5><h6>6级标题</h6> 2.段落标签 <p>文本段落</p> 3.列表标签 //无序列表<ul> <li></li> <li></li>

HTML 标签整理

序号 标签 描述 自闭合 基础 1 <!DOCTYPE> 2 <html> 定义一个 HTML 文档 3 <title> 为文档定义一个标题 4 <body> 定义文档的主体 5 <h1> to <h6> 定义 HTML 标题 6 <p> 定义一个段落 7 <br> 定义简单的折行. Y 8 <hr> 定义水平线. Y 9 <!--...--> 定义一个注释 Y 格式 10 <ac

web.xml常用标签整理(不定期更新)

1 <?xml version="1.0" encoding="UTF-8"?><!-- 标明使用的XML版本和文档编码,此项必须位于第一行,之前是空行注释都不行 --> 2 3 <!-- 4 web.xml学名为配置部署文件,是web应用的入口文件,用于声明系统的各项配置,此文件不是必须的,但也只是最简单的静态项目才没有. 5 xml文件中大小写敏感,书写次序敏感,自上而下加载,所以配置此文件时要注意标签的顺序和大小写. 6 --&g

织梦标签整理

系统标签: {dede:global.cfg_basehost/} {dede:global.cfg_webname/} {dede:global.cfg_powerby/}版权信息 {dede:field.keywords/} {dede:field.description/} {dede:global.cfg_beian/} {dede:global.cfg_templets_skin/}/style/ {dede:global.cfg_templets_skin/}/js/ {dede:g