1.创建分区表:
分区表有自己的分区列,而分区表则没有。
public static void createTableWithPartition(Odps odps, String createTableName) throws Exception { Tables tables = odps.tables();// /获取表示ODPS全部Table的集合对象 boolean a = tables.exists(createTableName);// 推断指定表test_table_jyl是否存在 if (a) { System.out.println("指定表存在"); Table table = tables.get(createTableName); tables.delete(createTableName);//存在就删除 } else { System.out.println("指定表不存在"); } System.out.println("-------------------------------------------------"); /* 创建表 */ if (tables.exists(createTableName)) { System.out.println("指定表存在,无法创建"); } else { System.out.println("指定表不存在,能够创建"); /* TableSchema表示ODPS中表的定义 */ TableSchema tableSchema = new TableSchema(); /* 加入列 */ Column col; // Column表示ODPS中表的列定义 col = new Column("id", OdpsType.STRING, "ID"); tableSchema.addColumn(col); col = new Column("name", OdpsType.STRING, "姓名"); tableSchema.addColumn(col); col = new Column("sex", OdpsType.BIGINT, "性别"); tableSchema.addColumn(col); col = new Column("birthday", OdpsType.DATETIME, "生日"); tableSchema.addColumn(col); /* 加入分区列 */ col = new Column("province ", OdpsType.STRING, "省(分区列)"); tableSchema.addPartitionColumn(col); tables.create(createTableName, tableSchema);//创建表 System.out.println("表【" + createTableName + "】创建成功"); } System.out.println("-------------------------------------------------"); }
2.分区表数据上传:
分区表上传数据必须指定分区。所以上传数据前必须保证存在分区,不存在就创建一个,创建分区有两种方法
/*PartitionSpec类表示一个特定分区的定义*/ String partitionColumn="province";//表中的分区列 /*第一种,直接调用带參构造函数, * 參数格式:分区定义字符串。比方: pt=‘1‘,ds=‘2‘ */ PartitionSpec partitionSpec1 = new PartitionSpec(partitionColumn+"=‘hubei‘"); /*另外一种,调用布带參数构造函数,再调用队形set方法。 */ PartitionSpec partitionSpec2 = new PartitionSpec(); partitionSpec2.set(partitionColumn, "hubei");
TableTunnel类中有两个创建创建上传会话方法:
createUploadSession
public TableTunnel.UploadSession createUploadSession(String projectName, String tableName) throws TunnelException
- 在非分区表上创建上传会话
-
- Parameters:
projectName
- Project名称tableName
- 表名,非视图- Returns:
TableTunnel.UploadSession
- Throws:
TunnelException
createUploadSession
public TableTunnel.UploadSession createUploadSession(String projectName, String tableName, PartitionSpec partitionSpec) throws TunnelException
- 在分区表上创建上传会话
注: 分区必须为最末级分区,如表有两级分区pt,ds, 则必须所有指定值, 不支持仅仅指定当中一个值
- Parameters:
projectName
- Project名tableName
- 表名,非视图partitionSpec
- 指定分区PartitionSpec
- Returns:
TableTunnel.UploadSession
- Throws:
TunnelException
分区表必须使用带分区的构造方法。还必须保证该分区存在,否则会报异常。
public static void uploadDataToYun(Odps odps, String project, String tableName) throws Exception { TableTunnel tunnel = new TableTunnel(odps); tunnel.setEndpoint(TUNNEL_URL);// 设置TunnelServer地址,没有设置TunnelServer地址的情况下自己主动选择 /*PartitionSpec类表示一个特定分区的定义*/ String partitionColumn="province";//表中的分区列 PartitionSpec partitionSpec = new PartitionSpec(); partitionSpec.set(partitionColumn, "hubei"); Table table = odps.tables().get(tableName);//获取当前表 boolean a= table.hasPartition(partitionSpec);//推断上述定义分区在表中是否存在 if(a){ System.out.println("分区已经存在,能够直接上传数据"); }else{ System.out.println("分区不存在,先创建分区再上传数据"); table.createPartition(partitionSpec); } /*在分区表上创建上传会话*/ TableTunnel.UploadSession uploadSession = tunnel.createUploadSession( project, tableName,partitionSpec); RecordWriter rw = uploadSession.openRecordWriter(1); Column[] columns = new Column[4]; columns[0] = new Column("id", OdpsType.STRING); columns[1] = new Column("name", OdpsType.STRING); columns[2] = new Column("sex", OdpsType.BIGINT); columns[3] = new Column("birthday", OdpsType.DATETIME); Record r = new ArrayRecord(columns); r.setString("id", "3"); r.setString("name", "name3"); r.setBigint("sex", (long) 2); Date date = new Date(); r.setDatetime("birthday", date); rw.write(r); rw.close();//一定要close,不然无法commit Long[] blocks = uploadSession.getBlockList(); uploadSession.commit(blocks); System.out.println("数据上传成功"); }
3.測试类:
private static final String ACCESS_ID = "***********"; private static final String ACCESS_KEY = "***************"; private static final String PROJECT_NAME = "*************"; private static final String TUNNEL_URL = "http://dt.odps.aliyun.com"; private static final String ODPS_URL = "http://service.odps.aliyun.com/api"; public static void main(String args[]) throws Exception { /* 先构建阿里云帐号 */ Account account = new AliyunAccount(ACCESS_ID, ACCESS_KEY); /* Odps类是ODPS SDK的入口 */ Odps odps = new Odps(account); odps.setDefaultProject(PROJECT_NAME);// 指定默认使用的Project名称 odps.setEndpoint(ODPS_URL);// 设置ODPS服务的地址 String tableName="test_table_jyl"; /*创建带分区的表*/ createTableWithPartition(odps,tableName); /*上传数据*/ uploadDataToYun(odps, PROJECT_NAME, tableName); }
时间: 2024-11-08 22:00:55