Hbase数据导入导出

平时用于从生产环境hbase到导出数据到测试环境。

导入数据:

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;

@SuppressWarnings("deprecation")
public class HbaseImport {

    public static void main(String args[]) throws Exception{
        Configuration conf = HBaseConfiguration.create();
        HConnection connection = HConnectionManager.createConnection(conf);
        List<Map<String,String>> datas = getDatas("d:\\hbaseData\\datas.txt",connection);
        wirteHbase(connection,"EVENT_LOG_LBS",datas);
    }

    public static List<Map<String,String>> getDatas(String filePath,HConnection connection) throws IOException{
        List<Map<String,String>> datas = new ArrayList<Map<String,String>>();
        File file = new File(filePath);
        BufferedReader br = new BufferedReader(new FileReader(file));
        String tr = null;
        while(((tr = br.readLine()) != null)){
            String subData = tr.substring(1);
            Map<String,String> data = new HashMap<String,String>();
            String[] ss = subData.split("\\|");
            for(String s : ss){
                String[] tds = s.split("=");
                String v = "";
                if(tds.length == 2){
                    v = tds[1];
                }
                data.put(tds[0], v);
            }
            datas.add(data);
        }
        br.close();
        return datas;
    }

    public static void wirteHbase(HConnection connection,String tableName,List<Map<String,String>>datas) throws IOException{
        HTableInterface t = connection.getTable(tableName);
        for(Map<String,String> map : datas){
            Set<String> ks = map.keySet();
            Put put = new Put(Bytes.toBytes(map.get("rowkey")));
            for(String key : ks){
                put.add(Bytes.toBytes("f1"),Bytes.toBytes(key),Bytes.toBytes(map.get(key)));
            }
            t.put(put);
        }
    }
}

导出数据:

package hbase;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;

@SuppressWarnings("deprecation")
public class HbaseExport {

	public static Date getPassSevenDays(int day){
		Calendar calendar = Calendar.getInstance();
		int year = calendar.get(Calendar.YEAR);
		int dayOfYear = calendar.get(Calendar.DAY_OF_YEAR);
		int j = 0;
		for(int i = 0;i < day; i++){
			calendar.set(Calendar.DAY_OF_YEAR, dayOfYear - j);
			if(calendar.get(Calendar.YEAR) < year){
				//跨年了
				j = 1;
				//更新 标记年
				year = year + 1;
				//重置日历
				calendar.set(year, Calendar.DECEMBER,31);
				//重新获取dayOfYear
				dayOfYear = calendar.get(Calendar.DAY_OF_YEAR);
			}else{
				j = j + 1;
			}
		}
		return calendar.getTime();
	}

	public static Scan setScanCondition(Scan scan) throws IOException{
		Date newDay = new Date();
		Date otherDays = getPassSevenDays(7);

		scan.setTimeRange(otherDays.getTime(),newDay.getTime());
		scan.addColumn("f1".getBytes(), "LS_certifier_no".getBytes());
		scan.addColumn("f1".getBytes(), "LS_location".getBytes());
		scan.addColumn("f1".getBytes(), "LS_phone_no".getBytes());
		scan.addColumn("f1".getBytes(), "LS_longitude".getBytes());
		scan.addColumn("f1".getBytes(), "LS_latitude".getBytes());
		scan.addColumn("f1".getBytes(), "date".getBytes());
		scan.addColumn("f1".getBytes(), "time".getBytes());
		scan.addColumn("f1".getBytes(), "hourOfDay".getBytes());

		return scan;
	}

	public static void main(String args[]) throws IOException{
		Configuration conf = HBaseConfiguration.create();
		HConnection connection = HConnectionManager.createConnection(conf);
		String tableName = "EVENT_LOG_LBS_HIS";
		HTableInterface table = connection.getTable(tableName);

		Scan scan = new Scan();
		setScanCondition(scan);

		ResultScanner rs = table.getScanner(scan);
		for(Result r : rs){
			List<String> lines = new ArrayList<String>();
			StringBuilder sb = new StringBuilder();
			sb.append(" rowkey=" + Bytes.toString(r.getRow()));
			for(Cell cell : r.rawCells()){
				String name = Bytes.toString(CellUtil.cloneQualifier(cell));
				String value = Bytes.toString(CellUtil.cloneValue(cell));
				System.out.println(name + "=" + value);
				sb.append("|" + name + "=" + value);
			}
			lines.add(sb.toString());
			System.out.println("--------------------------");
			writeFile(lines,"/home/hdfs/datas");
		}

	}

	public static void writeFile(List<String> lines,String filePath) throws FileNotFoundException{
		File file = new File(filePath);
		PrintWriter pw = new PrintWriter(new FileOutputStream(file,true));
		for(String line : lines){
			pw.append(line);
			pw.append("\n");
		}
		pw.flush();
		pw.close();
	}

}

  

时间: 2024-10-17 05:13:27

Hbase数据导入导出的相关文章

HData——ETL 数据导入/导出工具

HData是一个异构的ETL数据导入/导出工具,致力于使用一个工具解决不同数据源(JDBC.Hive.HDFS.HBase.MongoDB.FTP.Http.CSV.Excel.Kafka等)之间数据交换的问题.HData在设计上同时参考了开源的Sqoop.DataX,却与之有不同的实现.HData采用“框架+插件”的结构,具有较好的扩展性,框架相当于数据缓冲区,插件则为访问不同的数据源提供实现. [HData特性] 1.异构数据源之间高速数据传输: 2.跨平台独立运行: 3.数据传输过程全内存

Sqoop -- 用于Hadoop与关系数据库间数据导入导出工作的工具

转:https://blog.csdn.net/qx12306/article/details/67014096 Sqoop是一款开源的工具,主要用于在Hadoop相关存储(HDFS.Hive.HBase)与传统关系数据库(MySql.Oracle等)间进行数据传递工作.Sqoop最早是作为Hadoop的一个第三方模块存在,后来被独立成为了一个Apache项目.除了关系数据库外,对于某些NoSQL数据库,Sqoop也提供了连接器. 一.Sqoop基础知识 Sqoop项目开始于2009年,可以在H

Oracle数据导入导出基本操作示例

Oracle数据导入导出基本操作示例 数据导出 a.将数据库orcl完全导出,用户名user 密码password 导出到D:\dc.dmp中 exp user/[email protected]   file=d:\dc.dmp    full=y full=y   表示全库导出 b.将数据库中user1和user2用户导出 exp user/[email protected]  file=d:\dc.dmp    owner=(user1,user2) full方式可以备份所有用户的数据库对

考试系统维护--不同版本SQL数据导入导出

考试系统维护过程中,为了保证考试的顺利进行需要在多个服务器上搭建考试系统(备份),这时候需要把数据库来回迁移,之前我们常用的数据库备份还原的方法确实简单方便,但是遇到不同的服务器安装的SQL版本不同就歇菜了,虽然当时为了以后操作方便,我们把这次要用的服务器的数据库版本都统一了,但是在考试系统维护中米老师让我感触最深的一点-----"凡事多想一点!"多思考必须要应用到实际,所以我回来又仔细研究了几种不同版本SQL数据导入导出的方法,与大家交流提高. 一:使用SQLServer Impor

excel数据导入导出数据库

第一种方法: 先把Excel另存为.csv格式文件,如test.csv,再编写一个insert.ctl 用sqlldr进行导入! insert.ctl内容如下: load data          --1.控制文件标识 infile ‘my.csv‘          --2.要输入的数据文件名为my.csv append into table "tbl_test"   --3.向表table_name中追加记录 fields terminated by ‘,‘          

Oracle11g和10g数据导入导出

背景:Oracle数据导入导出imp/exp就相当于oracle数据还原与备份.exp命令可以把数据从远程数据库服务器导出到本地的dmp文件,imp命令可以把dmp文件从本地导入到远处的数据库服务器中.下面总结下具体的导入导出步骤: ------------------------------------------------------------------------------------------- #11g新特性数据库使用数据泵expdp&impdp导出导入操作# -------

Linux下mongodb安装及数据导入导出教程

Linux下mongodb安装及数据导入导出教程 #查看linux发行版本 cat /etc/issue #查看linux内核版本号 uname -r 一.Linux下mongodb安装的一般步骤 1.到mongodb的官网(https://www.mongodb.org/downloads) 下载相应你系统的安装包,拷贝(能够用ftp工具如winscp)到你的linux系统上面. 2.解压相应的安装包 命令例如以下:tar zxvf mongodb-linux-x86_64-3.0.4.tgz

Oracle数据导入导出imp/exp

在oracle安装目录下有EXP.EXE与IMP.EXE这2个文件,他们分别被用来执行数据库的导入导出.所以Oracle数据导入导出imp/exp就相当与oracle数据还原与备份. Oracle数据导出exp Exp参数详解: USERID 运行导出命令的帐号的用户名/口令 BUFFEER 用来取数据行的缓冲区的大小 FILE 导出转储文件的名字 COMPRESS 导出是否应该压缩有碎片的段成一个范围,这将会影响STORAGE子句 GRANTS 导出时否要导出数据库对象上的授权 INDEXES

Hive 实战(1)--hive数据导入/导出基础

前沿: Hive也采用类SQL的语法, 但其作为数据仓库, 与面向OLTP的传统关系型数据库(Mysql/Oracle)有着天然的差别. 它用于离线的数据计算分析, 而不追求高并发/低延时的应用场景. 最显著的特别是, Hive的数据是Schema On Read, 对数据的写入非常的自由和松散, 而对数据的读取则作了各种限制. 而RMDBS则是Schema On Write, 对数据写入限制非常的严苛. *). 数据导入/导出 让我们体验以下Hive中数据如何导入: 1). 创建数据库 db_