sqoop1.99.4 JAVA API操作



貌似天国还没有介绍1.99.4的java操作代码的,自己吃一次螃蟹吧



如果你是MAVEN项目

1 <dependency>
2   <groupId>org.apache.sqoop</groupId>
3     <artifactId>sqoop-client</artifactId>
4     <version>1.99.4</version>
5 </dependency>

如果你是java项目

导入sqoop1.99.4中shell目录下的lib里面全部jar包就行(不用server中的)



HDFS-MYSQL

  1 package org.admln.sqoopOperate;
  2
  3 import org.apache.sqoop.client.SqoopClient;
  4 import org.apache.sqoop.model.MFromConfig;
  5 import org.apache.sqoop.model.MJob;
  6 import org.apache.sqoop.model.MLink;
  7 import org.apache.sqoop.model.MLinkConfig;
  8 import org.apache.sqoop.model.MSubmission;
  9 import org.apache.sqoop.model.MToConfig;
 10 import org.apache.sqoop.submission.counter.Counter;
 11 import org.apache.sqoop.submission.counter.CounterGroup;
 12 import org.apache.sqoop.submission.counter.Counters;
 13 import org.apache.sqoop.validation.Status;
 14
 15 public class HDFSToMysql {
 16     public static void main(String[] args) {
 17         sqoopTransfer();
 18     }
 19     public static void sqoopTransfer() {
 20         //初始化
 21         String url = "http://hadoop:12000/sqoop/";
 22         SqoopClient client = new SqoopClient(url);
 23
 24         //创建一个源链接 HDFS
 25         long fromConnectorId = 1;
 26         MLink fromLink = client.createLink(fromConnectorId);
 27         fromLink.setName("HDFS connector");
 28         fromLink.setCreationUser("admln");
 29         MLinkConfig fromLinkConfig = fromLink.getConnectorLinkConfig();
 30         fromLinkConfig.getStringInput("linkConfig.uri").setValue("hdfs://hadoop:8020/");
 31         Status fromStatus = client.saveLink(fromLink);
 32         if(fromStatus.canProceed()) {
 33          System.out.println("创建HDFS Link成功,ID为: " + fromLink.getPersistenceId());
 34         } else {
 35          System.out.println("创建HDFS Link失败");
 36         }
 37         //创建一个目的地链接 JDBC
 38         long toConnectorId = 2;
 39         MLink toLink = client.createLink(toConnectorId);
 40         toLink.setName("JDBC connector");
 41         toLink.setCreationUser("admln");
 42         MLinkConfig toLinkConfig = toLink.getConnectorLinkConfig();
 43         toLinkConfig.getStringInput("linkConfig.connectionString").setValue("jdbc:mysql://hadoop:3306/hive");
 44         toLinkConfig.getStringInput("linkConfig.jdbcDriver").setValue("com.mysql.jdbc.Driver");
 45         toLinkConfig.getStringInput("linkConfig.username").setValue("hive");
 46         toLinkConfig.getStringInput("linkConfig.password").setValue("hive");
 47         Status toStatus = client.saveLink(toLink);
 48         if(toStatus.canProceed()) {
 49          System.out.println("创建JDBC Link成功,ID为: " + toLink.getPersistenceId());
 50         } else {
 51          System.out.println("创建JDBC Link失败");
 52         }
 53
 54         //创建一个任务
 55         long fromLinkId = fromLink.getPersistenceId();
 56         long toLinkId = toLink.getPersistenceId();
 57         MJob job = client.createJob(fromLinkId, toLinkId);
 58         job.setName("HDFS to MySQL job");
 59         job.setCreationUser("admln");
 60         //设置源链接任务配置信息
 61         MFromConfig fromJobConfig = job.getFromJobConfig();
 62         fromJobConfig.getStringInput("fromJobConfig.inputDirectory").setValue("/out/aboutyunLog/HiveExport/ipstatistical/data");
 63
 64         //创建目的地链接任务配置信息
 65         MToConfig toJobConfig = job.getToJobConfig();
 66         toJobConfig.getStringInput("toJobConfig.schemaName").setValue("aboutyunlog");
 67         toJobConfig.getStringInput("toJobConfig.tableName").setValue("ipstatistical");
 68         //toJobConfig.getStringInput("fromJobConfig.partitionColumn").setValue("id");
 69         // set the driver config values
 70         //MDriverConfig driverConfig = job.getDriverConfig();
 71         //driverConfig.getStringInput("throttlingConfig.numExtractors").setValue("3");//这句还没弄明白
 72         Status status = client.saveJob(job);
 73         if(status.canProceed()) {
 74          System.out.println("JOB创建成功,ID为: "+ job.getPersistenceId());
 75         } else {
 76          System.out.println("JOB创建失败。");
 77         }
 78
 79         //启动任务
 80         long jobId = job.getPersistenceId();
 81         MSubmission submission = client.startJob(jobId);
 82         System.out.println("JOB提交状态为 : " + submission.getStatus());
 83         while(submission.getStatus().isRunning() && submission.getProgress() != -1) {
 84           System.out.println("进度 : " + String.format("%.2f %%", submission.getProgress() * 100));
 85           //三秒报告一次进度
 86           try {
 87             Thread.sleep(3000);
 88           } catch (InterruptedException e) {
 89             e.printStackTrace();
 90           }
 91         }
 92         System.out.println("JOB执行结束... ...");
 93         System.out.println("Hadoop任务ID为 :" + submission.getExternalId());
 94         Counters counters = submission.getCounters();
 95         if(counters != null) {
 96           System.out.println("计数器:");
 97           for(CounterGroup group : counters) {
 98             System.out.print("\t");
 99             System.out.println(group.getName());
100             for(Counter counter : group) {
101               System.out.print("\t\t");
102               System.out.print(counter.getName());
103               System.out.print(": ");
104               System.out.println(counter.getValue());
105             }
106           }
107         }
108         if(submission.getExceptionInfo() != null) {
109           System.out.println("JOB执行异常,异常信息为 : " +submission.getExceptionInfo());
110         }
111         System.out.println("HDFS通过sqoop传输数据到MySQL统计执行完毕");
112     }
113 }


MYSQL-HDFS

  1 package org.admln.sqoopOperate;
  2
  3 import org.apache.sqoop.client.SqoopClient;
  4 import org.apache.sqoop.model.MDriverConfig;
  5 import org.apache.sqoop.model.MFromConfig;
  6 import org.apache.sqoop.model.MJob;
  7 import org.apache.sqoop.model.MLink;
  8 import org.apache.sqoop.model.MLinkConfig;
  9 import org.apache.sqoop.model.MSubmission;
 10 import org.apache.sqoop.model.MToConfig;
 11 import org.apache.sqoop.submission.counter.Counter;
 12 import org.apache.sqoop.submission.counter.CounterGroup;
 13 import org.apache.sqoop.submission.counter.Counters;
 14 import org.apache.sqoop.validation.Status;
 15
 16 public class MysqlToHDFS {
 17     public static void main(String[] args) {
 18         sqoopTransfer();
 19     }
 20     public static void sqoopTransfer() {
 21         //初始化
 22         String url = "http://hadoop:12000/sqoop/";
 23         SqoopClient client = new SqoopClient(url);
 24
 25         //创建一个源链接 JDBC
 26         long fromConnectorId = 2;
 27         MLink fromLink = client.createLink(fromConnectorId);
 28         fromLink.setName("JDBC connector");
 29         fromLink.setCreationUser("admln");
 30         MLinkConfig fromLinkConfig = fromLink.getConnectorLinkConfig();
 31         fromLinkConfig.getStringInput("linkConfig.connectionString").setValue("jdbc:mysql://hadoop:3306/hive");
 32         fromLinkConfig.getStringInput("linkConfig.jdbcDriver").setValue("com.mysql.jdbc.Driver");
 33         fromLinkConfig.getStringInput("linkConfig.username").setValue("hive");
 34         fromLinkConfig.getStringInput("linkConfig.password").setValue("hive");
 35         Status fromStatus = client.saveLink(fromLink);
 36         if(fromStatus.canProceed()) {
 37          System.out.println("创建JDBC Link成功,ID为: " + fromLink.getPersistenceId());
 38         } else {
 39          System.out.println("创建JDBC Link失败");
 40         }
 41         //创建一个目的地链接HDFS
 42         long toConnectorId = 1;
 43         MLink toLink = client.createLink(toConnectorId);
 44         toLink.setName("HDFS connector");
 45         toLink.setCreationUser("admln");
 46         MLinkConfig toLinkConfig = toLink.getConnectorLinkConfig();
 47         toLinkConfig.getStringInput("linkConfig.uri").setValue("hdfs://hadoop:8020/");
 48         Status toStatus = client.saveLink(toLink);
 49         if(toStatus.canProceed()) {
 50          System.out.println("创建HDFS Link成功,ID为: " + toLink.getPersistenceId());
 51         } else {
 52          System.out.println("创建HDFS Link失败");
 53         }
 54
 55         //创建一个任务
 56         long fromLinkId = fromLink.getPersistenceId();
 57         long toLinkId = toLink.getPersistenceId();
 58         MJob job = client.createJob(fromLinkId, toLinkId);
 59         job.setName("MySQL to HDFS job");
 60         job.setCreationUser("admln");
 61         //设置源链接任务配置信息
 62         MFromConfig fromJobConfig = job.getFromJobConfig();
 63         fromJobConfig.getStringInput("fromJobConfig.schemaName").setValue("sqoop");
 64         fromJobConfig.getStringInput("fromJobConfig.tableName").setValue("sqoop");
 65         fromJobConfig.getStringInput("fromJobConfig.partitionColumn").setValue("id");
 66         MToConfig toJobConfig = job.getToJobConfig();
 67         toJobConfig.getStringInput("toJobConfig.outputDirectory").setValue("/usr/tmp");
 68         MDriverConfig driverConfig = job.getDriverConfig();
 69         driverConfig.getStringInput("throttlingConfig.numExtractors").setValue("3");
 70
 71         Status status = client.saveJob(job);
 72         if(status.canProceed()) {
 73          System.out.println("JOB创建成功,ID为: "+ job.getPersistenceId());
 74         } else {
 75          System.out.println("JOB创建失败。");
 76         }
 77
 78         //启动任务
 79         long jobId = job.getPersistenceId();
 80         MSubmission submission = client.startJob(jobId);
 81         System.out.println("JOB提交状态为 : " + submission.getStatus());
 82         while(submission.getStatus().isRunning() && submission.getProgress() != -1) {
 83           System.out.println("进度 : " + String.format("%.2f %%", submission.getProgress() * 100));
 84           //三秒报告一次进度
 85           try {
 86             Thread.sleep(3000);
 87           } catch (InterruptedException e) {
 88             e.printStackTrace();
 89           }
 90         }
 91         System.out.println("JOB执行结束... ...");
 92         System.out.println("Hadoop任务ID为 :" + submission.getExternalId());
 93         Counters counters = submission.getCounters();
 94         if(counters != null) {
 95           System.out.println("计数器:");
 96           for(CounterGroup group : counters) {
 97             System.out.print("\t");
 98             System.out.println(group.getName());
 99             for(Counter counter : group) {
100               System.out.print("\t\t");
101               System.out.print(counter.getName());
102               System.out.print(": ");
103               System.out.println(counter.getValue());
104             }
105           }
106         }
107         if(submission.getExceptionInfo() != null) {
108           System.out.println("JOB执行异常,异常信息为 : " +submission.getExceptionInfo());
109         }
110         System.out.println("MySQL通过sqoop传输数据到HDFS统计执行完毕");
111     }
112 }


别问为什么没有MYSQL和HBASE、HIVE互导的代码



20150102

时间: 2024-10-12 21:09:30

sqoop1.99.4 JAVA API操作的相关文章

Sqoop1.99.4 Java API实践

软件版本: Sqoop:1.99.4:JDK:1.7:Hadoop2.2: 环境: 1. Hadoop集群: node12:NameNode.ResourceManager.JobHistoryServer.DataNode.NodeManager: 1.5G内存+10G硬盘+1核 node13:SecondaryNameNode.DataNode.NodeManager:1.5G内存+10G硬盘+1核 2. Sqoop server: Sqoop server部署在node13上: 3. in

使用java api操作Hadoop文件 Robbin

1 package cn.hadoop.fs; 2 3 import java.io.IOException; 4 import java.io.InputStream; 5 import java.net.URI; 6 import java.net.URISyntaxException; 7 8 import org.apache.hadoop.conf.Configuration; 9 import org.apache.hadoop.fs.FSDataOutputStream; 10 i

Java API操作HDFS

HDFS是存储数据的分布式文件系统,对HDFS的操作,就是对文件系统的操作,除了用HDFS的shell命令对文件系统进行操作,我们也可以利用Java API对文件系统进行操作,比如文件的创建.删除.修改权限等等,还有文件夹的创建.删除.重命名等等. 使用Java API对文件系统进行操作主要涉及以下几个类: 1.Configuration类:该类的对象封装了客户端或者服务端的配置. 2.FileSystem类:该类的对象是一个文件系统对象,可以利用该对象的一些方法来对文件进行操作,FileSys

Hadoop读书笔记(三)Java API操作HDFS

Hadoop读书笔记(一)Hadoop介绍:http://blog.csdn.net/caicongyang/article/details/39898629 Hadoop读书笔记(二)HDFS的shell操作:http://blog.csdn.net/caicongyang/article/details/41253927 JAVA URL 操作HDFS OperateByURL.java package hdfs; import java.io.InputStream; import jav

HDFS基础和java api操作

1. 概括 适合一次写入多次查询情况,不支持并发写情况 通过hadoop shell 上传的文件存放在DataNode的block中,通过linux shell只能看见block,看不见文件(HDFS将客户端的大文件存放在很多节点的数据块中,Block本质上是一个逻辑概念,它是hdfs读写数据的基本单位) HDFS中,如果一个文件小于一个数据块的大小,并不占用整个数据块存储空间 2. fs 可以使用hdfs shell操作hdfs,常用 fs命令如下: eg: hadoop fs -cat fi

hive-通过Java API操作

通过Java API操作hive,算是测试hive第三种对外接口 测试hive 服务启动 1 package org.admln.hive; 2 3 import java.sql.SQLException; 4 import java.sql.Connection; 5 import java.sql.ResultSet; 6 import java.sql.Statement; 7 import java.sql.DriverManager; 8 9 public class testHiv

大数据技术之_20_Elasticsearch学习_01_概述 + 快速入门 + Java API 操作 + 创建、删除索引 + 新建、搜索、更新删除文档 + 条件查询 + 映射操作

一 概述1.1 什么是搜索?1.2 如果用数据库做搜索会怎么样?1.3 什么是全文检索和 Lucene?1.4 什么是 Elasticsearch?1.5 Elasticsearch 的适用场景1.6 Elasticsearch 的特点1.7 Elasticsearch 的核心概念1.7.1 近实时1.7.2 Cluster(集群)1.7.3 Node(节点)1.7.4 Index(索引 --> 数据库)1.7.5 Type(类型 --> 表)1.7.6 Document(文档 -->

Java API操作ZooKeeper

创建会话 1 package org.zln.zk; 2 3 import org.apache.zookeeper.WatchedEvent; 4 import org.apache.zookeeper.Watcher; 5 import org.apache.zookeeper.ZooKeeper; 6 7 import java.io.IOException; 8 9 /** 10 * Created by sherry on 16/8/27. 11 */ 12 public class

HBase 6、用Phoenix Java api操作HBase

开发环境准备:eclipse3.5.jdk1.7.window8.hadoop2.2.0.hbase0.98.0.2.phoenix4.3.0 1.从集群拷贝以下文件:core-site.xml.hbase-site.xml.hdfs-site.xml文件放到工程src下 2.把phoenix的phoenix-4.3.0-client.jar和phoenix-core-4.3.0.jar添加到工程classpath 3.配置集群中各节点的hosts文件,把客户端的hostname:IP添加进去