从main()函数開始
public static void main(String [] args) {
VersionInfo.logVersion();
new HMasterCommandLine(HMaster.class).doMain(args);
}
public void doMain(String args[]) {
try {
int ret = ToolRunner.run(HBaseConfiguration.create(), this, args);
if (ret != 0) {
System.exit(ret);
}
} catch (Exception e) {
LOG.error("Failed to run", e);
System.exit(-1);
}
}
这里是使用的hadoop的ToolRunner。
public static int More ...run(Configuration conf, Tool tool, String[] args)
throws Exception{
if(conf == null) {
conf = new Configuration();
}
GenericOptionsParser parser = new GenericOptionsParser(conf, args);
//set the configuration back, so that Tool can configure itself
tool.setConf(conf);
//get the args w/o generic hadoop args
String[] toolArgs = parser.getRemainingArgs();
return tool.run(toolArgs);
}
事实上就是走了一遍GenericOptionsParser解析參数。又回到tool.run(toolArgs);而这里的tool事实上有这种继承关系HMasterCommandLine-->ServerCommandLine-->Tool,而这里的tool事实上有这种继承关系HMasterCommandLine的run()方法
public int run(String args[]) throws Exception {
Options opt = new Options();
opt.addOption("localRegionServers", true,
"RegionServers to start in master process when running standalone");
opt.addOption("masters", true, "Masters to start in this process");
opt.addOption("minRegionServers", true, "Minimum RegionServers needed to host user tables");
opt.addOption("backup", false, "Do not try to become HMaster until the primary fails");
CommandLine cmd;
try {
cmd = new GnuParser().parse(opt, args);
} catch (ParseException e) {
LOG.error("Could not parse: ", e);
usage(null);
return 1;
}
if (cmd.hasOption("minRegionServers")) {
String val = cmd.getOptionValue("minRegionServers");
getConf().setInt("hbase.regions.server.count.min",
Integer.valueOf(val));
LOG.debug("minRegionServers set to " + val);
}
// minRegionServers used to be minServers. Support it too.
if (cmd.hasOption("minServers")) {
String val = cmd.getOptionValue("minServers");
getConf().setInt("hbase.regions.server.count.min",
Integer.valueOf(val));
LOG.debug("minServers set to " + val);
}
// check if we are the backup master - override the conf if so
if (cmd.hasOption("backup")) {
getConf().setBoolean(HConstants.MASTER_TYPE_BACKUP, true);
}
// How many regionservers to startup in this process (we run regionservers in same process as
// master when we are in local/standalone mode. Useful testing)
if (cmd.hasOption("localRegionServers")) {
String val = cmd.getOptionValue("localRegionServers");
getConf().setInt("hbase.regionservers", Integer.valueOf(val));
LOG.debug("localRegionServers set to " + val);
}
// How many masters to startup inside this process; useful testing
if (cmd.hasOption("masters")) {
String val = cmd.getOptionValue("masters");
getConf().setInt("hbase.masters", Integer.valueOf(val));
LOG.debug("masters set to " + val);
}
List<String> remainingArgs = cmd.getArgList();
if (remainingArgs.size() != 1) {
usage(null);
return 1;
}
String command = remainingArgs.get(0);
if ("start".equals(command)) {
return startMaster();
} else if ("stop".equals(command)) {
return stopMaster();
} else if ("clear".equals(command)) {
return (ZNodeClearer.clear(getConf()) ? 0 : 1);
} else {
usage("Invalid command: " + command);
return 1;
}
}
启动的时候command="start",走startMaster()
private int startMaster() {
Configuration conf = getConf();
try {
// If ‘local‘, defer to LocalHBaseCluster instance. Starts master
// and regionserver both in the one JVM.
if (LocalHBaseCluster.isLocal(conf)) {
final MiniZooKeeperCluster zooKeeperCluster = new MiniZooKeeperCluster(conf);
File zkDataPath = new File(conf.get(HConstants.ZOOKEEPER_DATA_DIR));
int zkClientPort = conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT, 0);
if (zkClientPort == 0) {
throw new IOException("No config value for "
+ HConstants.ZOOKEEPER_CLIENT_PORT);
}
zooKeeperCluster.setDefaultClientPort(zkClientPort);
// login the zookeeper server principal (if using security)
ZKUtil.loginServer(conf, "hbase.zookeeper.server.keytab.file",
"hbase.zookeeper.server.kerberos.principal", null);
int clientPort = zooKeeperCluster.startup(zkDataPath);
if (clientPort != zkClientPort) {
String errorMsg = "Could not start ZK at requested port of " +
zkClientPort + ". ZK was started at port: " + clientPort +
". Aborting as clients (e.g. shell) will not be able to find " +
"this ZK quorum.";
System.err.println(errorMsg);
throw new IOException(errorMsg);
}
conf.set(HConstants.ZOOKEEPER_CLIENT_PORT,
Integer.toString(clientPort));
// Need to have the zk cluster shutdown when master is shutdown.
// Run a subclass that does the zk cluster shutdown on its way out.
LocalHBaseCluster cluster = new LocalHBaseCluster(conf, conf.getInt("hbase.masters", 1),
conf.getInt("hbase.regionservers", 1), LocalHMaster.class, HRegionServer.class);
((LocalHMaster)cluster.getMaster(0)).setZKCluster(zooKeeperCluster);
cluster.startup();
waitOnMasterThreads(cluster);
} else {
logProcessInfo(getConf());
HMaster master = HMaster.constructMaster(masterClass, conf);
if (master.isStopped()) {
LOG.info("Won‘t bring the Master up as a shutdown is requested");
return 1;
}
master.start();
master.join();
if(master.isAborted())
throw new RuntimeException("HMaster Aborted");
}
} catch (Throwable t) {
LOG.error("Master exiting", t);
return 1;
}
return 0;
}
假设LocalHBaseCluster.isLocal(),启动一个本地集群LocalHBaseCluster,这里会在同一个JVM启动HMaster,HRegionServer
这里默认:hbase.cluster.distributed=false
/** Cluster is in distributed mode or not */
public static final String CLUSTER_DISTRIBUTED = "hbase.cluster.distributed";
/** Config for pluggable load balancers */
public static final String HBASE_MASTER_LOADBALANCER_CLASS = "hbase.master.loadbalancer.class";
/** Cluster is standalone or pseudo-distributed */
public static final boolean CLUSTER_IS_LOCAL = false;
public static boolean isLocal(final Configuration c) {
boolean mode = c.getBoolean(HConstants.CLUSTER_DISTRIBUTED, HConstants.DEFAULT_CLUSTER_DISTRIBUTED);
return(mode == HConstants.CLUSTER_IS_LOCAL);
}
假设hbase.cluster.distributed=true,创建一个HMaster实例
logProcessInfo(getConf());
HMaster master = HMaster.constructMaster(masterClass, conf);
if (master.isStopped()) {
LOG.info("Won‘t bring the Master up as a shutdown is requested");
return 1;
}
master.start();
master.join();
if(master.isAborted())
throw new RuntimeException("HMaster Aborted");
构造HMaster
Constructor<?
extends HMaster> c =
masterClass.getConstructor(Configuration.class);
return c.newInstance(conf);
正是的构造函数:
1,获取master的地址,ip:port
public static final String MASTER_PORT = "hbase.master.port";为端口,默认60000
int port = conf.getInt(HConstants.MASTER_PORT, HConstants.DEFAULT_MASTER_PORT);
2,处理RPC的handler的数量。由hbase.regionserver.handler.count
3,创建HMaster的RPCServer实例.
4,启动RPCServer的线程,
5, 创建zookeeper,创建hbase相关节点
public HMaster(final Configuration conf)
throws IOException, KeeperException, InterruptedException {
this.conf = new Configuration(conf);
// Disable the block cache on the master
this.conf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
// Server to handle client requests.
String hostname = Strings.domainNamePointerToHostName(DNS.getDefaultHost(
conf.get("hbase.master.dns.interface", "default"),
conf.get("hbase.master.dns.nameserver", "default")));
int port = conf.getInt(HConstants.MASTER_PORT, HConstants.DEFAULT_MASTER_PORT);
// Test that the hostname is reachable
InetSocketAddress initialIsa = new InetSocketAddress(hostname, port);
if (initialIsa.getAddress() == null) {
throw new IllegalArgumentException("Failed resolve of hostname " + initialIsa);
}
// Verify that the bind address is reachable if set
String bindAddress = conf.get("hbase.master.ipc.address");
if (bindAddress != null) {
initialIsa = new InetSocketAddress(bindAddress, port);
if (initialIsa.getAddress() == null) {
throw new IllegalArgumentException("Failed resolve of bind address " + initialIsa);
}
}
String name = "master/" + initialIsa.toString();
// Set how many times to retry talking to another server over HConnection.
HConnectionManager.setServerSideHConnectionRetries(this.conf, name, LOG);
int numHandlers = conf.getInt("hbase.master.handler.count",
conf.getInt("hbase.regionserver.handler.count", 25));
this.rpcServer = new RpcServer(this, name, getServices(),
initialIsa, // BindAddress is IP we got for this server.
numHandlers,
0, // we dont use high priority handlers in master
conf,
0); // this is a DNC w/o high priority handlers
// Set our address.
this.isa = this.rpcServer.getListenerAddress();
// We don‘t want to pass isa‘s hostname here since it could be 0.0.0.0
this.serverName = new ServerName(hostname, this.isa.getPort(), System.currentTimeMillis());
this.rsFatals = new MemoryBoundedLogMessageBuffer(
conf.getLong("hbase.master.buffer.for.rs.fatals", 1*1024*1024));
// login the zookeeper client principal (if using security)
ZKUtil.loginClient(this.conf, "hbase.zookeeper.client.keytab.file",
"hbase.zookeeper.client.kerberos.principal", this.isa.getHostName());
// initialize server principal (if using secure Hadoop)
User.login(conf, "hbase.master.keytab.file",
"hbase.master.kerberos.principal", this.isa.getHostName());
LOG.info("hbase.rootdir=" + FSUtils.getRootDir(this.conf) +
", hbase.cluster.distributed=" + this.conf.getBoolean("hbase.cluster.distributed", false));
// set the thread name now we have an address
setName(MASTER + ":" + this.serverName.toShortString());
Replication.decorateMasterConfiguration(this.conf);
// Hack! Maps DFSClient => Master for logs. HDFS made this
// config param for task trackers, but we can piggyback off of it.
if (this.conf.get("mapred.task.id") == null) {
this.conf.set("mapred.task.id", "hb_m_" + this.serverName.toString());
}
this.zooKeeper = new ZooKeeperWatcher(conf, MASTER + ":" + isa.getPort(), this, true);
this.rpcServer.startThreads();
this.pauseMonitor = new JvmPauseMonitor(conf);
this.pauseMonitor.start();
// metrics interval: using the same property as region server.
this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000);
//should we check the compression codec type at master side, default true, HBASE-6370
this.masterCheckCompression = conf.getBoolean("hbase.master.check.compression", true);
this.metricsMaster = new MetricsMaster( new MetricsMasterWrapperImpl(this));
// Health checker thread.
int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ,
HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
if (isHealthCheckerConfigured()) {
healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration());
}
// Do we publish the status?
boolean shouldPublish = conf.getBoolean(HConstants.STATUS_PUBLISHED,
HConstants.STATUS_PUBLISHED_DEFAULT);
Class<? extends ClusterStatusPublisher.Publisher> publisherClass =
conf.getClass(ClusterStatusPublisher.STATUS_PUBLISHER_CLASS,
ClusterStatusPublisher.DEFAULT_STATUS_PUBLISHER_CLASS,
ClusterStatusPublisher.Publisher.class);
if (shouldPublish) {
if (publisherClass == null) {
LOG.warn(HConstants.STATUS_PUBLISHED + " is true, but " +
ClusterStatusPublisher.DEFAULT_STATUS_PUBLISHER_CLASS +
" is not set - not publishing status");
} else {
clusterStatusPublisherChore = new ClusterStatusPublisher(this, conf, publisherClass);
Threads.setDaemonThreadRunning(clusterStatusPublisherChore.getThread());
}
}
distributedLogReplay = this.conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY,
HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG);
}
创建zookeeper。连接到zookeeper,创建hbase相关节点。
这里主要两个函数setNodeNames()和createBaseZNodes()
this.zooKeeper = new ZooKeeperWatcher(conf, MASTER + ":" + isa.getPort(), this, true);
public ZooKeeperWatcher(Configuration conf, String identifier,
Abortable abortable, boolean canCreateBaseZNode)
throws IOException, ZooKeeperConnectionException {
this.conf = conf;
// Capture a stack trace now. Will print it out later if problem so we can
// distingush amongst the myriad ZKWs.
try {
throw new Exception("ZKW CONSTRUCTOR STACK TRACE FOR DEBUGGING");
} catch (Exception e) {
this.constructorCaller = e;
}
this.quorum = ZKConfig.getZKQuorumServersString(conf);
// Identifier will get the sessionid appended later below down when we
// handle the syncconnect event.
this.identifier = identifier;
this.abortable = abortable;
setNodeNames(conf);
this.recoverableZooKeeper = ZKUtil.connect(conf, quorum, this, identifier);
if (canCreateBaseZNode) {
createBaseZNodes();
}
}
HMaster的主循环的run()方法,这里主要关心
1,becomeActiveMaster(startupStatus);
2,finishInitialization(startupStatus, false);
3,loop()
@Override
public void run() {
MonitoredTask startupStatus =
TaskMonitor.get().createStatus("Master startup");
startupStatus.setDescription("Master startup");
masterStartTime = System.currentTimeMillis();
try {
this.registeredZKListenersBeforeRecovery = this.zooKeeper.getListeners();
this.masterAddressManager = new MasterAddressTracker(getZooKeeperWatcher(), this);
this.masterAddressManager.start();
// Put up info server.
int port = this.conf.getInt("hbase.master.info.port", 60010);
if (port >= 0) {
String a = this.conf.get("hbase.master.info.bindAddress", "0.0.0.0");
this.infoServer = new InfoServer(MASTER, a, port, false, this.conf);
this.infoServer.addServlet("status", "/master-status", MasterStatusServlet.class);
this.infoServer.addServlet("dump", "/dump", MasterDumpServlet.class);
this.infoServer.setAttribute(MASTER, this);
this.infoServer.start();
}
/*
* Block on becoming the active master.
*
* We race with other masters to write our address into ZooKeeper. If we
* succeed, we are the primary/active master and finish initialization.
*
* If we do not succeed, there is another active master and we should
* now wait until it dies to try and become the next active master. If we
* do not succeed on our first attempt, this is no longer a cluster startup.
*/
becomeActiveMaster(startupStatus);
// We are either the active master or we were asked to shutdown
if (!this.stopped) {
finishInitialization(startupStatus, false);
loop();
}
} catch (Throwable t) {
// HBASE-5680: Likely hadoop23 vs hadoop 20.x/1.x incompatibility
if (t instanceof NoClassDefFoundError &&
t.getMessage().contains("org/apache/hadoop/hdfs/protocol/FSConstants$SafeModeAction")) {
// improved error message for this special case
abort("HBase is having a problem with its Hadoop jars. You may need to "
+ "recompile HBase against Hadoop version "
+ org.apache.hadoop.util.VersionInfo.getVersion()
+ " or change your hadoop jars to start properly", t);
} else {
abort("Unhandled exception. Starting shutdown.", t);
}
} finally {
startupStatus.cleanup();
stopChores();
// Wait for all the remaining region servers to report in IFF we were
// running a cluster shutdown AND we were NOT aborting.
if (!this.abort && this.serverManager != null &&
this.serverManager.isClusterShutdown()) {
this.serverManager.letRegionServersShutdown();
}
stopServiceThreads();
// Stop services started for both backup and active masters
if (this.activeMasterManager != null) this.activeMasterManager.stop();
if (this.catalogTracker != null) this.catalogTracker.stop();
if (this.serverManager != null) this.serverManager.stop();
if (this.assignmentManager != null) this.assignmentManager.stop();
if (this.fileSystemManager != null) this.fileSystemManager.stop();
if (this.snapshotManager != null) this.snapshotManager.stop("server shutting down.");
this.zooKeeper.close();
}
LOG.info("HMaster main thread exiting");
}
时间: 2024-11-02 17:04:22