在不同版本hdfs集群之间转移数据

最简单的办法就是把src集群的数据导到本地,然后起另一个进程将本地数据传到des集群上去。

不过这有几个问题:

  • 效率降低
  • 占用本地磁盘空间
  • 不能应付实时导数据需求
  • 两个进程需要协调,复杂度增加

更好的办法是在同一个进程内一边读src数据,一边写des集群。不过这相当于在同一个进程空间内加载两个版本的hadoop jar包,这就需要在程序中使用两个classloader来实现。

以下代码可以实现classloader加载自定义的jar包,并生成需要的Configuration对象:

Java代码

  • URL[] jarUrls = new URL[1];
  • jarUrls[0]=new File(des_jar_path).toURI().toURL();
  • ClassLoader jarloader = new URLClassLoader(jarUrls, null);
  • Class Proxy = Class.forName("yourclass", true, jarloader);
  • Configuration conf = (Configuration)Proxy.newInstance();

URL[] jarUrls = new URL[1];

jarUrls[0]=newFile(des_jar_path).toURI().toURL();

ClassLoader jarloader = newURLClassLoader(jarUrls, null);

Class Proxy =Class.forName("yourclass", true, jarloader);

Configuration conf =(Configuration)Proxy.newInstance();

但是由于在生成HTable对象时,需要使用这个conf对象,而加载这个conf对象的代码本身是由默认的classloader加载的,也就是0.19.2的jar包。所以在以上代码最后一行所强制转换的Configuration对象仍然是0.19.2版本的。那怎么办呢?

琢磨了一会,发现如果要实现以上功能,必须将生成HTable对象,以及以后的所有hbase操作都使用这个新的classloader,因此这个新的classloader必须加载除了0.19.2的jar包外所有需要用到的jar包,然后把所有操作都封装进去。在外面用反射来调用。

这样的话,通常构造函数都不为空了,因此需要用到Constructor来构造一个自定义的构造函数

代码段如下:

Java代码

  • main.java
  • void init(){
  • ClassLoader jarloader = generateJarLoader();
  • Class Proxy = Class.forName("test.writer.hbasewriter.HBaseProxy", true, jarloader);
  • Constructor con = Proxy.getConstructor(new Class[]{String.class, String.class, boolean.class});
  • Boolean autoflush = param.getBoolValue(ParamsKey.HbaseWriter.autoFlush, true);
  • proxy = con.newInstance(new Object[]{path, tablename, autoflush});
  • }
  • void put(){
  • ...
  • while((line = getLine()) != null) {
  • proxy.getClass().getMethod("generatePut",String.class).invoke(proxy, line.getField(rowkey));
  • Method addPut = proxy.getClass().getMethod("addPut",
  • new Class[]{String.class, String.class, String.class});
  • addPut.invoke(proxy, new Object[]{field, column, encode});
  • proxy.getClass().getMethod("putLine").invoke(proxy);
  • }
  • }
  • ClassLoader generateJarLoader() throws IOException {
  • String libPath = System.getProperty("java.ext.dirs");
  • FileFilter filter = new FileFilter() {
  • @Override
  • public boolean accept(File pathname) {
  • if(pathname.getName().startsWith("hadoop-0.19.2"))
  • return false;
  • else
  • return pathname.getName().endsWith(".jar");
  • }
  • };
  • File[] jars = new File(libPath).listFiles(filter);
  • URL[] jarUrls = new URL[jars.length+1];
  • int k = 0;
  • for (int i = 0; i < jars.length; i++) {
  • jarUrls[k++] = jars.toURI().toURL();
  • }
  • jarUrls[k] = new File("hadoop-0.20.205.jar")
  • ClassLoader jarloader = new URLClassLoader(jarUrls, null);
  • return jarloader;
  • }

main.java

void init(){

ClassLoader jarloader = generateJarLoader();

Class Proxy =Class.forName("test.writer.hbasewriter.HBaseProxy", true, jarloader);

Constructor con = Proxy.getConstructor(new Class[]{String.class,String.class, boolean.class});

Boolean autoflush =param.getBoolValue(ParamsKey.HbaseWriter.autoFlush, true);

proxy = con.newInstance(new Object[]{path, tablename, autoflush});

}

void put(){

...

while((line = getLine()) != null) {

proxy.getClass().getMethod("generatePut",String.class).invoke(proxy,line.getField(rowkey));

Method addPut = proxy.getClass().getMethod("addPut",

new Class[]{String.class, String.class, String.class});

addPut.invoke(proxy, new Object[]{field, column, encode});

proxy.getClass().getMethod("putLine").invoke(proxy);

}

}

ClassLoader generateJarLoader()throws IOException {

String libPath =System.getProperty("java.ext.dirs");

FileFilter filter = new FileFilter() {

@Override

public boolean accept(File pathname) {

if(pathname.getName().startsWith("hadoop-0.19.2"))

return false;

else

returnpathname.getName().endsWith(".jar");

}

};

File[] jars = newFile(libPath).listFiles(filter);

URL[] jarUrls = new URL[jars.length+1];

int k = 0;

for (int i = 0; i < jars.length; i++){

jarUrls[k++] = jars.toURI().toURL();

}

jarUrls[k] = newFile("hadoop-0.20.205.jar")

ClassLoader jarloader = newURLClassLoader(jarUrls, null);

return jarloader;

}

Java代码

  • HBaseProxy.java
  • public HBaseProxy(String hbase_conf, String tableName, boolean autoflush)
  • throws IOException{
  • Configuration conf = new Configuration();
  • conf.addResource(new Path(hbase_conf));
  • config = new Configuration(conf);
  • htable = new HTable(config, tableName);
  • admin = new HBaseAdmin(config);
  • htable.setAutoFlush(autoflush);
  • }
  • public void addPut(String field, String column, String encode) throws IOException {
  • try {
  • p.add(column.split(":")[0].getBytes(), column.split(":")[1].getBytes(),
  • field.getBytes(encode));
  • } catch (UnsupportedEncodingException e) {
  • p.add(column.split(":")[0].getBytes(), column.split(":")[1].getBytes(),
  • field.getBytes());
  • }
  • }
  • public void generatePut(String rowkey){
  • p = new Put(rowkey.getBytes());
  • }
  • public void putLine() throws IOException{
  • htable.put(p);
  • }

HBaseProxy.java

public HBaseProxy(Stringhbase_conf, String tableName, boolean autoflush)

throws IOException{

Configuration conf = new Configuration();

conf.addResource(new Path(hbase_conf));

config = new Configuration(conf);

htable = new HTable(config, tableName);

admin = new HBaseAdmin(config);

htable.setAutoFlush(autoflush);

}

public void addPut(Stringfield, String column, String encode) throws IOException {

try {

p.add(column.split(":")[0].getBytes(), column.split(":")[1].getBytes(),

field.getBytes(encode));

} catch (UnsupportedEncodingException e) {

p.add(column.split(":")[0].getBytes(),column.split(":")[1].getBytes(),

field.getBytes());

}

}

public void generatePut(String rowkey){

p = new Put(rowkey.getBytes());

}

public void putLine() throws IOException{

htable.put(p);

}

总之,在同一个进程中加载多个classloader时一定要注意,classloader A所加载的对象是不能转换成classloader B的对象的,当然也不能使用。两个空间的相互调用只能用java的基本类型或是反射。

更多精彩内容请关注:http://bbs.superwu.cn

关注超人学院微信二维码:

关注超人学院java免费学习交流群:

时间: 2024-10-11 10:54:39

在不同版本hdfs集群之间转移数据的相关文章

在不同版本号hdfs集群之间转移数据

最简单的办法就是把src集群的数据导到本地,然后起还有一个进程将本地数据传到des集群上去. 只是这有几个问题: 效率减少 占用本地磁盘空间 不能应付实时导数据需求 两个进程须要协调,复杂度添加 更好的办法是在同一个进程内一边读src数据,一边写des集群.只是这相当于在同一个进程空间内载入两个版本号的hadoop jar包.这就须要在程序中使用两个classloader来实现. 下面代码能够实现classloader载入自己定义的jar包,并生成须要的Configuration对象: Java

HDFS集群和YARN集群

Hadoop集群环境搭建(一) 1集群简介 HADOOP集群具体来说包含两个集群:HDFS集群和YARN集群,两者逻辑上分离,但物理上常在一起 HDFS集群: 负责海量数据的存储,集群中的角色主要有 NameNode / DataNode YARN集群: 负责海量数据运算时的资源调度,集群中的角色主要有 ResourceManager /NodeManager 本集群搭建案例,以3节点为例进行搭建,角色分配如下: hdp-node-01 NameNode SecondaryNameNode Re

在Docker下搭建Spark+HDFS集群

在Docker下搭建Spark+HDFS集群 1.      在VM中安装Ubuntu OS并实现root登录 (http://jingyan.baidu.com/article/148a1921a06bcb4d71c3b1af.html) 安装Vm增强工具 http://www.jb51.net/softjc/189149.html 2.      安装Docker docker安装方法一 ubuntu14.04以上的版本都是自带docker安装包的:所以可以直接安装:但是这个一般不是最先版本

Linux环境下HDFS集群环境搭建关键步骤

Linux环境下HDFS集群环境搭建关键步骤记录. 介质版本:hadoop-2.7.3.tar.gz 节点数量:3节点. 一.下载安装介质 官网下载地址:http://hadoop.apache.org/releases.html 二.服务器规划 MASTER:NAMENODE, DATANODENODE1:DATANODENODE2:SECONDARY NAMENODE, DATANODE 三.配置hostname和hosts 192.168.13.4 master192.168.13.5 n

HDFS集群中DataNode的上线与下线

在HDFS集群的运维过程中,肯定会遇到DataNode的新增和删除,即上线与下线.这篇文章就详细讲解下DataNode的上线和下线的过程. 背景 在我们的微职位视频课程中,我们已经安装了3个节点的HDFS集群,master机器上安装了NameNode和SecondaryNameNode角色,slave1和slave2两台机器上分别都安装了DataNode角色. 我们现在来给这个HDFS集群新增一个DataNode,这个DataNode是安装在master机器上 我们需要说明的是:在实际环境中,N

八百里流媒体服务器系统单机版本和集群版本的对比

苏州八百里网络科技有限公司作为专业的流媒体音视频技术解决方案的提供商,针对用户并发,功能和应用需求不同,开发了两个版本的流媒体服务器系统,提供局域网或互联网的高清标清网络直播和点播. 流媒体服务器系统提供: 嵌入网页的HTML代码和视频播放地址. 支持PC和手机端(安卓和苹果)的网页观看,应用了HLS协议和H5的技术. 接收标准的RTMP推送网络直播流并按照各种协议进行网络数据分发. 具有先进的Flash P2P技术,可以节省大量的带宽成本. 视频分片存储并加密同时提供防盗链,让独有的视频内容得

Mongo 3.6.1版本Sharding集群配置

Mongo低版本和高版本的sharding集群配置,细节不太一样.目前网上的配置文档大都是针对低版本的.本人在配置3.6.1版本的mongosharding集群的过程中,碰到不少问题,官方文档没有直观的示例,参考起来有点一头雾水.特整理记录下自己的测试环境sharding集群配置过程,供大家参考. Mongo sharding集群由config server,mongos(路由)及shards服务器组成.他们的关系及扮演的角色,网上到处都是,不再详细介绍. 最开始,打算将config.mongo

HDFS集群安装部署

1-> 集群的准备工作 1)关闭防火墙(进行远程连接) systemctl stop firewalld  systemctl -disable firewalld 2)永久修改设置主机名  vi /etc/hostname 注意:需要重启生效->reboot 3)配置映射文件 vi /etc/hosts #127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4 #::1         lo

搭建HDFS集群

hdfs集群组成结构: 安装hdfs集群的具体步骤: 1.学习阶段,用虚拟机即可! 先准备4台虚拟机:1个namenode节点  + 3 个datanode 节点 2.修改各台机器的主机名和ip地址 主机名:hdp-01  对应的ip地址:192.168.33.61 主机名:hdp-02  对应的ip地址:192.168.33.62 主机名:hdp-03  对应的ip地址:192.168.33.63 主机名:hdp-04  对应的ip地址:192.168.33.64 Vi /etc/udev/r