【ODPS】阿里云ODPS中带分区的表操作

1.创建分区表:

分区表有自己的分区列,而分区表则没有。

public static void createTableWithPartition(Odps odps, String createTableName)
			throws Exception {
		Tables tables = odps.tables();// /获取表示ODPS全部Table的集合对象
		boolean a = tables.exists(createTableName);// 推断指定表test_table_jyl是否存在
		if (a) {
			System.out.println("指定表存在");
			Table table = tables.get(createTableName);
			tables.delete(createTableName);//存在就删除
		} else {
			System.out.println("指定表不存在");
		}
		System.out.println("-------------------------------------------------");

		/* 创建表 */
		if (tables.exists(createTableName)) {
			System.out.println("指定表存在,无法创建");
		} else {
			System.out.println("指定表不存在,能够创建");
			/* TableSchema表示ODPS中表的定义 */
			TableSchema tableSchema = new TableSchema();
			/* 加入列 */
			Column col; // Column表示ODPS中表的列定义
			col = new Column("id", OdpsType.STRING, "ID");
			tableSchema.addColumn(col);
			col = new Column("name", OdpsType.STRING, "姓名");
			tableSchema.addColumn(col);
			col = new Column("sex", OdpsType.BIGINT, "性别");
			tableSchema.addColumn(col);
			col = new Column("birthday", OdpsType.DATETIME, "生日");
			tableSchema.addColumn(col);

			/* 加入分区列 */
			col = new Column("province ", OdpsType.STRING, "省(分区列)");
			tableSchema.addPartitionColumn(col);

			tables.create(createTableName, tableSchema);//创建表
			System.out.println("表【" + createTableName + "】创建成功");
		}
		System.out.println("-------------------------------------------------");

	}

2.分区表数据上传:

分区表上传数据必须指定分区。所以上传数据前必须保证存在分区,不存在就创建一个,创建分区有两种方法

/*PartitionSpec类表示一个特定分区的定义*/
		String partitionColumn="province";//表中的分区列
		/*第一种,直接调用带參构造函数,
		 * 參数格式:分区定义字符串。比方: pt=‘1‘,ds=‘2‘
		 */
		PartitionSpec partitionSpec1 = new PartitionSpec(partitionColumn+"=‘hubei‘");

		/*另外一种,调用布带參数构造函数,再调用队形set方法。

*/
		PartitionSpec partitionSpec2 = new PartitionSpec();
		partitionSpec2.set(partitionColumn, "hubei");

TableTunnel类中有两个创建创建上传会话方法:

createUploadSession

public TableTunnel.UploadSession createUploadSession(String projectName,
                                                     String tableName)
                                              throws TunnelException
在非分区表上创建上传会话

Parameters:
projectName - Project名称
tableName - 表名,非视图
Returns:
TableTunnel.UploadSession
Throws:
TunnelException

createUploadSession

public TableTunnel.UploadSession createUploadSession(String projectName,
                                                     String tableName,
                                                     PartitionSpec partitionSpec)
                                              throws TunnelException
在分区表上创建上传会话

注: 分区必须为最末级分区,如表有两级分区pt,ds, 则必须所有指定值, 不支持仅仅指定当中一个值

Parameters:
projectName - Project名
tableName - 表名,非视图
partitionSpec - 指定分区 PartitionSpec
Returns:
TableTunnel.UploadSession
Throws:
TunnelException

分区表必须使用带分区的构造方法。还必须保证该分区存在,否则会报异常。

public static void uploadDataToYun(Odps odps, String project, String tableName)
			throws Exception {
		TableTunnel tunnel = new TableTunnel(odps);
		tunnel.setEndpoint(TUNNEL_URL);// 设置TunnelServer地址,没有设置TunnelServer地址的情况下自己主动选择

		/*PartitionSpec类表示一个特定分区的定义*/
		String partitionColumn="province";//表中的分区列
		PartitionSpec partitionSpec = new PartitionSpec();
		partitionSpec.set(partitionColumn, "hubei");

		Table table = odps.tables().get(tableName);//获取当前表
		boolean a= table.hasPartition(partitionSpec);//推断上述定义分区在表中是否存在
		if(a){
			System.out.println("分区已经存在,能够直接上传数据");
		}else{
			System.out.println("分区不存在,先创建分区再上传数据");
			table.createPartition(partitionSpec);
		}

		/*在分区表上创建上传会话*/
		TableTunnel.UploadSession uploadSession = tunnel.createUploadSession(
				project, tableName,partitionSpec);

		RecordWriter rw = uploadSession.openRecordWriter(1);
		Column[] columns = new Column[4];
		columns[0] = new Column("id", OdpsType.STRING);
		columns[1] = new Column("name", OdpsType.STRING);
		columns[2] = new Column("sex", OdpsType.BIGINT);
		columns[3] = new Column("birthday", OdpsType.DATETIME);
		Record r = new ArrayRecord(columns);

		r.setString("id", "3");
		r.setString("name", "name3");
    	r.setBigint("sex", (long) 2);
    	Date date = new Date();
    	r.setDatetime("birthday", date);
    	rw.write(r);
    	rw.close();//一定要close,不然无法commit

		Long[] blocks = uploadSession.getBlockList();
		uploadSession.commit(blocks);
		System.out.println("数据上传成功");
	}

3.測试类:

	private static final String ACCESS_ID = "***********";
	private static final String ACCESS_KEY = "***************";
	private static final String PROJECT_NAME = "*************";
	private static final String TUNNEL_URL = "http://dt.odps.aliyun.com";
	private static final String ODPS_URL = "http://service.odps.aliyun.com/api";

	public static void main(String args[]) throws Exception {

		/* 先构建阿里云帐号 */
		Account account = new AliyunAccount(ACCESS_ID, ACCESS_KEY);

		/* Odps类是ODPS SDK的入口 */
		Odps odps = new Odps(account);
		odps.setDefaultProject(PROJECT_NAME);// 指定默认使用的Project名称
		odps.setEndpoint(ODPS_URL);// 设置ODPS服务的地址

		String tableName="test_table_jyl";
		/*创建带分区的表*/
		createTableWithPartition(odps,tableName);

		/*上传数据*/
		uploadDataToYun(odps, PROJECT_NAME, tableName);
	}
时间: 2024-11-08 22:00:55

【ODPS】阿里云ODPS中带分区的表操作的相关文章

大数据平台Hive数据迁移至阿里云ODPS平台流程与问题记录

一.背景介绍 最近几天,接到公司的一个将当前大数据平台数据全部迁移到阿里云ODPS平台上的任务.而申请的这个ODPS平台是属于政务内网的,因考虑到安全问题当前的大数据平台与阿里云ODPS的网络是不通的,所以不能使用数据采集工作流模板. 然而,考虑到原大数据平台数据量并不是很大,可以通过将原大数据平台数据导出到CSV文件,然后再将CSV文件导入到ODPS平台.在这个过程中踩的坑有点多,所以想写篇文档作为记录. 二.大数据平台Hive数据导出到本地 编写export_data.sh脚本如下: #!/

搭建二级网站遇到的问题:二.如何在阿里云主机中设置二级域名开通子网站

如何在阿里云主机中设置二级域名开通子网站,这个问题涉及到两步:一首先要做二级域名的A记录解析到服务器IP:二配置Apache的配置文件httpd.conf,让服务器识别该二级域名,然后重新加载httpd.conf文件即可: 二级域名的A记录解析到服务器IP 二级域名的A记录解析需要修改你域名DNS解析,比如我们要设置wordpress.webzhe.com到阿里云的主机IP,可以添加wordpress的二级域名(主机记录),选择A记录后,设置IP为阿里云的IP即可(主机记录值):通常这个解析是很

阿里云centos中mysql的安装及一些常识知识

------------------------------------------------------------------- 阿里云centos中mysql的安装 工具WinSCP v5.7.0可以从windows上传文件到linux上 工具putty可以再windows上连接linux,用命令行操作很方便 将mysql以下三个包下载并上传到linux上,例如:/root/software MySQL-server-5.5.28-1.linux2.6.x86_64.rpm MySQL-

重新定义数据库的时刻,阿里云数据库专家带你了解POLARDB

摘要:POLARDB是阿里云ApsaraDB数据库团队研发的基于云计算架构的下一代关系型数据库,其最大的特色是计算节点与存储节点分离,借助优秀的RDMA网络以及最新的块存储技术.POLARDB不但满足了公有云计算环境下用户业务快速弹性扩展的刚性需求,同时也满足了互联网环境下用户对数据库服务器高可用的需求.本文就带领大家了解什么是"云原生数据库",云原生数据库的标准是什么,如何定义以及为何如此定义?为大家介绍下一代云原生数据库POLARDB的架构.产品设计.未来工作等内容. 以下内容根据

阿里云服务器中centos7 解决wdcp中不能远程访问mysql服务的问题

1.检查mysql是否开启了远程访问的权限 1.1 如果没有授予相应的权限则通过以下命令修改下: GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' IDENTIFIED BY '123456' WITH GRANT OPTION;//使用root用户从任何主机连接到mysql服务器. 允许某个用户从指定ip的主机连接到mysql服务器 GRANT ALL PRIVILEGES ON *.* TO 'jack'@'10.10.50.127' IDENTIFIED

阿里云odps介绍

https://help.aliyun.com/product/27797.htmlmaxCompute(大数据计算服务,原名ODPS)是一种快速.完全托管的TB/PB级数据仓库解决方案.MaxCompute向用户提供了完善的数据导入方案以及多种经典的分布式计算模型,能够更快速的解决用户海量数据计算问题,有效降低企业成本,并保障数据安全MaxCompute主要服务于批量结构化数据的存储和计算,可以提供海量数据仓库的解决方案以及针对大数据的分析建模服务.MaxCompute只能以表的形式存储数据,

阿里云ODPS操作技巧

语句操作: https://help.aliyun.com/document_detail/odps/SQL/ddl.html odps客户端: https://help.aliyun.com/document_detail/odps/tools/console/console.html?spm=5176.product8314999_odps.6.243.h3NEQt 在客户端可以查看运行的任务情况: show p from date1 to date2 number,eg:show p fr

阿里云CentOS中vsftp安装、配置、卸载

1--卸载 查看当前服务器中的vsftpdrpm -qa|grep vsftpd 例如结果为:vsftpd-2.2.2-13.el6_6.1.x86_64执行卸载rpm -e vsftpd-2.2.2-13.el6_6.1.x86_64返回:卸载时自动备份vsftp的用户列表文件warning: /etc/vsftpd/vsftpd.conf saved as /etc/vsftpd/vsftpd.conf.rpmsavewarning: /etc/vsftpd/user_list saved

阿里云centos中tomcat安装及开机自启动

------------------------------------------------------------------------------- 官网下载一个tomcat(如1.7的) apache-tomcat-7.0.62.tar.gz 使用工具将文件上传到centos中,我将文件全部放在了/usr/local中 使用putty工具登陆之后进入/usr/local/ 解压tomcat文件压缩包 tar -zxvf apache-tomcat-7.0.62.tar.gz 本级文件