读取hive文件并将数据导入hbase

package cn.tansun.bd.hbase;

import java.io.IOException;

import java.net.URI;

import java.text.SimpleDateFormat;

import java.util.ArrayList;

import java.util.HashMap;

import java.util.Iterator;

import java.util.List;

import java.util.Map;

import java.util.Map.Entry;

import java.util.Set;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.hbase.KeyValue;

import org.apache.hadoop.hbase.client.HTable;

import org.apache.hadoop.hbase.io.ImmutableBytesWritable;

import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;

import org.apache.hadoop.hbase.util.Bytes;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import cn.tansun.bd.utils.JDBCUtils;

/**

 * @author 作者 E-mail: 

 * @version 创建时间:2016年7月5日 下午7:57:17 类说明

 */

public class HiveMySQl2HBaseMR {

	@SuppressWarnings("deprecation")

	public static void main(String[] args) {

		getDatas();

		/*

		 * if (args.length != 3){ System.err.println(

		 * "Usage: HiveMySQl2HBaseMR <table_name><data_input_path><hfile_output_path>"

		 * ); System.exit( 2 ); }

		 */

		Configuration conf = new Configuration();

		conf.addResource("hbase-site.xml");

		String table = "2";

		String input = "hdfs://node11:9000/datas/hivedata5";

		String output = "hdfs://node11:9000/datas/out15";

		HTable htable;

		try {

			// 运行前,删除已存在的中间输出目录

			try {

				FileSystem fs = FileSystem.get(URI.create(output), conf);

				fs.delete(new Path(output), true);

				fs.close();

			} catch (IOException e1) {

				e1.printStackTrace();

			}
			htable = new HTable(conf, table.getBytes());

			Job job = new Job(conf);

			job.setJobName("Generate HFile");

			job.setJarByClass(HiveMySQl2HBaseMR.class);

			job.setInputFormatClass(TextInputFormat.class);

			job.setMapperClass(HiveMySQlMapper.class);

			FileInputFormat.setInputPaths(job, input);

			job.getConfiguration().set("mapred.mapoutput.key.class",

					"org.apache.hadoop.hbase.io.ImmutableBytesWritable");

			job.getConfiguration().set("mapred.mapoutput.value.class",

					"org.apache.hadoop.hbase.KeyValue");

			FileOutputFormat.setOutputPath(job, new Path(output));

			HFileOutputFormat2.configureIncrementalLoad(job, htable);

			try {

				job.waitForCompletion(true);

			} catch (InterruptedException e) {

				e.printStackTrace();

			} catch (ClassNotFoundException e) {

				e.printStackTrace();

			}

		} catch (IOException e) {

			e.printStackTrace();

		}

	}

	public static String tableName;

	public static String cf = null;

	public static String strRowkey = null;

	public static String strIndex = null;

	public static String column_name = null;

	public static String strColumn = null;

	// mysql读取获得cf、rowKey、cloumn, qual

	@SuppressWarnings("rawtypes")

	public static List<Map> getDatas() {

	//	List<Map> listDatas = new ArrayList<Map>();

		String sql = "SELECT DISTINCT s.tableName, ar.rowkey,af.column_family, 	aq.column_hive_index,   aq.column_name FROM "

				+ " archive_htable s, 	archive_hrowkey ar, 	archive_hfamily af, 	archive_hqualifier aq WHERE "

				+ "	s.rowkey_id = ar.rowkey_id  AND ar.family_id = af.family_id    AND s.tableName = ‘2‘";

		List<Map> selectDatas = JDBCUtils.selectDatas(sql);

		for(Map<String,String> metaData:selectDatas){

			if(null==tableName){

				tableName=metaData.get("tableName");

			}

			if(null==cf){

				cf=metaData.get("column_family");

			}

			if(null==strRowkey){

				strRowkey=metaData.get("rowkey");

			}

			String strTempIndex = metaData.get("column_hive_index");

			String strTempName = metaData.get("column_name");

			if (null==strColumn||(null!=strColumn&&"".equals(strColumn))) {

				strColumn = strTempIndex + "	" + strTempName;

			} else {

				strColumn = strColumn + "," + strTempIndex + "	"

						+ strTempName;

			}

		}

		return selectDatas;

	}

}

class HiveMySQlMapper extends

		Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue> {

	String tableName = HiveMySQl2HBaseMR.tableName;

	String cf = HiveMySQl2HBaseMR.cf;

	String rowKey = HiveMySQl2HBaseMR.strRowkey;

	String strColumnName = HiveMySQl2HBaseMR.column_name;

	String strColumn = HiveMySQl2HBaseMR.strColumn;

	String split = "001";

	// private String strRowKey;

	/*

	 * @Override protected void setup(Context context) {

	 * 

	 * tableName = context.getConfiguration().get("tableName"); cf =

	 * context.getConfiguration().get("cf"); rowKey =

	 * context.getConfiguration().get("rowkey"); split =

	 * context.getConfiguration().get("split"); }

	 */

	@Override

	protected void map(

			LongWritable key,

			Text value,

			Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue>.Context context)

			throws IOException, InterruptedException {

		// 将rowkey 是数字并且带有","的分隔符去掉,获得对应的数字

		// get rkMaps indexa

		String strRowKey = "";

		String[] datas = value.toString().split("\t");

		// get rowkey

/*		for (String strIndex : rowKey.split(",")) {

			strRowKey = datas[Integer.valueOf(strIndex)] + split;

		}*/

		for (String strIndex : rowKey.split(",")) {

			if (null ==(strRowKey) || (null != strRowKey)&& "".equals(strRowKey)) {

				strRowKey = datas[Integer.valueOf(strIndex)];

				System.out.println(strRowKey +"strRowKey1");

			} else {

				strRowKey = strRowKey + split

						+ datas[Integer.valueOf(strIndex)];

				System.out.println(strRowKey +"strRowKey2");

			}

		}

		for (String str : strColumn.split(",")) {

			String[] columnTupe = str.split("\t");

			String columnData = datas[Integer.valueOf(columnTupe[0])];

			 String columnName = columnTupe[1];

			System.out.println(columnData + "columnDatacolumnData");

			ImmutableBytesWritable rk = new ImmutableBytesWritable(

					Bytes.toBytes(rowKey));

			// byte[] row, byte[] family, byte[] qualifier, byte[] value

			KeyValue kv = new KeyValue(Bytes.toBytes(strRowKey), // "a\001b\001\c\001"

					cf.getBytes(), Bytes.toBytes(columnName),

					Bytes.toBytes(columnData));

			context.write(rk, kv);

		}

	}

}

  

JDBCUtils类:

package cn.tansun.bd.utils;

import java.io.ByteArrayInputStream;

import java.io.IOException;

import java.io.InputStream;

import java.sql.Connection;

import java.sql.DriverManager;

import java.sql.ResultSet;

import java.sql.ResultSetMetaData;

import java.sql.SQLException;

import java.sql.Statement;

import java.util.ArrayList;

import java.util.HashMap;

import java.util.Iterator;

import java.util.List;

import java.util.Map;

import java.util.Map.Entry;

import java.util.Properties;

import java.util.Set;

/**

 * @author 作者 E-mail: zgl

 * @version 创建时间:2016年6月23日 下午4:25:03 类说明

 */

public class JDBCUtils {

    public JDBCUtils()

    {

    }

    public static String PATH = "jdbc.properties";

    public static Properties prop;

    public static String url = null;

    public static String username = null;

    public static String password = null;

    public static Connection conn;

    public static Statement stmt;

    public static ResultSet rs;

    public static String fileName = null;

    static {

        try {

            InputStream inputStream = JDBCUtils.class.getClassLoader().getResourceAsStream( PATH );

            prop = new Properties();

            prop.load( inputStream );

            url = prop.getProperty( "jdbc.url" );

            username = prop.getProperty( "jdbc.username" );

            password = prop.getProperty( "jdbc.password" );

            if ( inputStream != null ) {

                inputStream.close();

            }

        }

        catch ( IOException e ) {

            e.printStackTrace();

        }

    }

    public static void closeConnection( Connection conn ) {

        if ( conn != null ) {

            try {

                conn.close();

            }

            catch ( SQLException e ) {

                e.printStackTrace();

            }

        }

    }

    /**

     * 根据sql语句查询

     * 

     * @param sql

     * @return

     */

    @SuppressWarnings( "rawtypes" )

    public static List<Map> selectDatas( String sql ) {

        List<Map> listDatas = new ArrayList<Map>();

        try {

            conn = DriverManager.getConnection( url, username, password );

            conn.setAutoCommit( false );

            stmt =

                conn.prepareStatement( "load data local infile ‘‘ " + "into table loadtest fields terminated by ‘,‘" );

            StringBuilder sb = new StringBuilder();

            InputStream is = new ByteArrayInputStream( sb.toString().getBytes() );

            ( (com.mysql.jdbc.Statement) stmt ).setLocalInfileInputStream( is );

            rs = stmt.executeQuery( sql );

            if ( rs != null ) {

                ResultSetMetaData metaData = rs.getMetaData();

                int count = metaData.getColumnCount();

                Map<String, Object> map = null;

                while ( rs.next() ) {

                    map = new HashMap<String, Object>();

                    for ( int i = 1; i < count + 1; i++ ) {

                        map.put( metaData.getColumnName( i ), rs.getObject( i ) );

                    }

                    listDatas.add( map );

                }

            }

        }

        catch ( SQLException e ) {

            e.printStackTrace();

        }

        return listDatas;

    }

    /**

     * 

     * @param sql

     * @return

     */

    public static List<String>  getStrMap( String  sql) {

         List<String> strList = new ArrayList<String>();

         try {

             conn = DriverManager.getConnection( url, username, password );

             conn.setAutoCommit( false );

             stmt =

                 conn.prepareStatement( "load data local infile ‘‘ " + "into table loadtest fields terminated by ‘,‘" );

             StringBuilder sb = new StringBuilder();

             InputStream is = new ByteArrayInputStream( sb.toString().getBytes() );

             ( (com.mysql.jdbc.Statement) stmt ).setLocalInfileInputStream( is );

             rs = stmt.executeQuery( sql );

             if ( rs != null ) {

                 ResultSetMetaData metaData = rs.getMetaData();

                 int count = metaData.getColumnCount();

                 while (rs.next()){

                 for (int i = 1; i < count + 1; i++){

                     //String str1 = metaData.getColumnName( i );

                     String str2 = (String) rs.getObject( i );

                     strList.add(str2);

                 }

              }

             }

         }

         catch ( SQLException e ) {

             e.printStackTrace();

         }

        return strList;

    }

    public static String table_name = null;

    public static String rowkey = null;

    public static String column_family = null;

    public static String column_name = null;

    private static String rows = null;

    public static String sql = null;

    public static String sql2 = null;

    @SuppressWarnings( "rawtypes" )

    public static void main( String[] args ) {

		sql2 = "SELECT   GROUP_CONCAT( DISTINCT aq.column_hive_index,‘  ‘,   aq.column_name ,‘ ‘    ORDER BY   "

				+ "    aq.column_hive_index SEPARATOR ‘,‘  ) AS column_names FROM  archive_hqualifier  aq "

				+ "where aq.table_id = 77 GROUP BY   aq.column_name ORDER BY aq.column_hive_index";

		sql ="SELECT DISTINCT 	s.tableName, 	ar.rowkey,	af.column_family,	"

				+ "aq.column_name FROM	archive_htable s,archive_hrowkey ar,archive_hfamily af,"

				+ " 	archive_hqualifier aq "

				+ "WHERE s .rowkey_id = ar.rowkey_id AND ar.family_id = af.family_id "

				+ "AND af.qualifier_id = aq.qualifier_id;";     

		String datas  = null;

		List<String> strList = getStrMap(sql);

		String substring  = null;

		 for (int i = 0; i < strList.size(); i++){

			datas = strList.get(i);

			//datas = strList.get(i).substring(0,   strList.get(i).length()-1);

			System.out.print(datas);

		 }
    }
}

  

时间: 2024-08-05 02:19:39

读取hive文件并将数据导入hbase的相关文章

数据导入HBase最常用的三种方式及实践分析

数据导入HBase最常用的三种方式及实践分析         摘要:要使用Hadoop,需要将现有的各种类型的数据库或数据文件中的数据导入HBase.一般而言,有三种常见方式:使用HBase的API中的Put方法,使用HBase 的bulk load工具和使用定制的MapReduce Job方式.本文均有详细描述. [编者按]要使用Hadoop,数据合并至关重要,HBase应用甚广.一般而言,需要 针对不同情景模式将现有的各种类型的数据库或数据文件中的数据转入至HBase 中.常见方式为:使用H

MapReduce生成HFile文件,再使用BulkLoad导入HBase中(完全分布式运行)

声明: 若要转载, 请标明出处. 前提: 在对于大量的数据导入到HBase中, 如果一条一条进行插入, 则太耗时了, 所以可以先采用MapReduce生成HFile文件, 然后使用BulkLoad导入HBase中. 引用: 一.这种方式有很多的优点: 1. 如果我们一次性入库hbase巨量数据,处理速度慢不说,还特别占用Region资源, 一个比较高效便捷的方法就是使用 "Bulk Loading"方法,即HBase提供的HFileOutputFormat类. 2. 它是利用hbase

Sqoop将mysql数据导入hbase的血与泪

Sqoop将mysql数据导入hbase的血与泪(整整搞了大半天)  版权声明:本文为yunshuxueyuan原创文章.如需转载请标明出处: https://my.oschina.net/yunshuxueyuan/blogQQ技术交流群:299142667 一. 问题如何产生 庞老师只讲解了mysql和hdfs,mysq与hive的数据互导,因此决定研究一下将mysql数据直接导入hbase,这时出现了一系列问题. 心酸史: 二. 开始具体解决问题 需求:(将以下这张表数据导入mysql)

[Python]将Excel文件中的数据导入MySQL

Github Link 需求 现有2000+文件夹,每个文件夹下有若干excel文件,现在要将这些excel文件中的数据导入mysql. 每个excel文件的第一行是无效数据. 除了excel文件中已有的数据,还要添加一列,名为“at_company”,值为821. 流程 (1)获取excel文件列表,并根据excel文件名确定之后需要创建的table名: (2)连接mysql (3)创建table (4)插入数据 (5)断开连接 依赖模块 1. xlrd # to read excel fil

使用sqoop将MySQL数据库中的数据导入Hbase

使用sqoop将MySQL数据库中的数据导入Hbase 前提:安装好 sqoop.hbase. 下载jbdc驱动:mysql-connector-java-5.1.10.jar 将 mysql-connector-java-5.1.10.jar 拷贝到 /usr/lib/sqoop/lib/ 下 MySQL导入HBase命令: sqoop import --connect jdbc:mysql://10.10.97.116:3306/rsearch --table researchers --h

将 text 文件里的数据导入到 mysql 数据库中

如题,将 text 文件里的数据导入到 mysql 数据库中. 我自己具体的实现可以分为几种了: 1.写你擅长的程序设计语言 进行读写文件,然后连接数据库,进行写入: 2.在 mysql 里直接进行运行 sql 脚本语句,进行导入. 第一个现在就不再说了,简单. 现在就说说怎么直接将 .text 文件利用 sql 语句 进行导入. 1.首先在数据库中新建一个表(这里的表至少要和 你数据里的字段进行匹配,即一行存在的字段数): 2.运行sql脚本语句: 比如: 你的文件为 D:/field.txt

Hive 数据导入HBase的2中方法详解

最近经常被问到这个问题,所以简单写一下总结. Hive数据导入到HBase基本有2个方案: 1.HBase中建表,然后Hive中建一个外部表,这样当Hive中写入数据后,HBase中也会同时更新 2.MapReduce读取Hive数据,然后写入(API或者Bulkload)到HBase 1.Hive 外部表 创建hbase表 (1) 建立一个表格classes具有1个列族user create 'classes','user' (2) 查看表的构造 hbase(main):005:0> desc

【甘道夫】通过bulk load将HDFS上的数据导入HBase

引言 通过bulkload将HDFS上的数据装载进HBase是常用的入门级HBase技能,下面简单记录下关键步骤. bulkload的详细情况请参见官网文档. 过程 第一步:每台机器执行 ln -s $HBASE_HOME/conf/hbase-site.xml $HADOOP_HOME/etc/hadoop/hbase-site.xml 第二步:编辑$HADOOP_HOME/etc/hadoop/hadoop-env.sh,拷贝到所有节点 末尾添加: export HADOOP_CLASSPA

使用sqlldr将文件中的数据导入到数据库

1.创建数据文件: ?如,在D:\创建 zhaozhenlong.txt 文件,文件内容为: 11,12,1321,22,2331,32,33 2.创建控制文件: 如,在D:\创建 zhaozhenlong.ctl 文件,文件内容为: load da tainfile 'd:\zhaozhenlong.txt'append into table zhaozhenlongfields terminated by ','(c1,c2,c3) 3.在数据库中创建表: create table zhao