前提:有一个hadoop集群,并且拷贝core-site.xml,hdfs-site.xml,mapred-site.xml,yarn-site.xml到classpath下,可以使src/main/resources
1.获取一个GetNewApplicationRequest,实例是protobuf的类GetNewApplicationRequestPBImpl,未来会支持其他序列化方式。序列化方式决定了RPC工厂,产生哪种可序列化类。
Records.newRecord,就是实例化一个protobuf类,注意如果自己写代码,对protobuf的后缀命名有一些规则,必须是"PBImpl"结尾,package命名必须以"impl.pb";结尾,否则找不到对应的protobuf class
GetNewApplicationRequest request = Records .newRecord(GetNewApplicationRequest.class);Configuration conf = new Configuration();
2.获取resouceManager所在的ip和端口
InetSocketAddress rmAddress = conf.getSocketAddr( YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS, YarnConfiguration.DEFAULT_RM_PORT);
3.新建一个YarnRPC实例,默认用的是该类:org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC,并且获取
ApplicationClientProtocol实例,默认类是ApplicationClientProtocolPBClientImpl
紧接着调用获取一个新的appID
// get Yarn RPC YarnRPC rpc = YarnRPC.create(conf); // get a proxy from yarn rpc, yarn rpc is a factory, can produce some protocol rmClientProtocol = (ApplicationClientProtocol) (rpc.getProxy( ApplicationClientProtocol.class, rmAddress, conf)); // get response from yarn rpc proxy with sending the GetNewApplicationRequest // which contains a protobuf object. GetNewApplicationResponse newApp = rmClientProtocol .getNewApplication(request); System.err.println("get a new application id:" + newApp.getApplicationId());
具体proxy里面做了如何初始化哪些类,还需要进一步研究。
总结就是目前hadoop支持Protobuf作为默认序列化框架,目前YARN 的RPC都采用的是protobuf的工厂。
下面是完整代码
package com.jarvis.yarnapp; import java.io.IOException; import java.net.InetSocketAddress; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.util.Records; public class ZJXClient { private ApplicationClientProtocol rmClientProtocol; public void run() throws YarnException, IOException { // get a request for communicate with yarn GetNewApplicationRequest request = Records .newRecord(GetNewApplicationRequest.class); Configuration conf = new Configuration(); // get resouce manager address InetSocketAddress rmAddress = conf.getSocketAddr( YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS, YarnConfiguration.DEFAULT_RM_PORT); // get Yarn RPC YarnRPC rpc = YarnRPC.create(conf); // get a proxy from yarn rpc, yarn rpc is a factory, can produce some protocol rmClientProtocol = (ApplicationClientProtocol) (rpc.getProxy( ApplicationClientProtocol.class, rmAddress, conf)); // get response from yarn rpc proxy with sending the GetNewApplicationRequest // which contains a protobuf object. GetNewApplicationResponse newApp = rmClientProtocol .getNewApplication(request); System.err.println("get a new application id:" + newApp.getApplicationId()); } public static void main(String[] args) throws YarnException, IOException { ZJXClient zjxClient = new ZJXClient(); zjxClient.run(); } }
时间: 2024-10-24 21:05:22