Driver类是对
org.apache.hadoop.hive.ql.processors.CommandProcessor.java
接口的实现,重写了run方法,定义了常见sql的执行方式.
public class Driver implements CommandProcessor
具体的方法调用顺序:
run--->runInternal--->(createTxnManager+recordValidTxns)----->compileInternal---> compile--analyzer(BaseSemanticAnalyzer)--->execute
其中compile和execute是两个比较重要的方法:
compile用来完成语法和语义的分析,生成执行计划
execute执行物理计划,即提交相应的mapredjob
通过打印perflog可以看到Driver类的简单地时序图:
下面来看下Driver类的几个常用的方法实现:
1)createTxnManager 用来获取目前设置的用于实现lock的类,比如:
org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager
2)checkConcurrency 用来判断当前hive设置是否支持并发控制:
boolean supportConcurrency = conf.getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY);
主要是通过判断hive.support.concurrency参数,默认是false
3)getClusterStatus 调用JobClient类的getClusterStatus方法来获取集群的状态:
public ClusterStatus getClusterStatus() throws Exception { ClusterStatus cs; try { JobConf job = new JobConf(conf , ExecDriver.class); JobClient jc = new JobClient(job); cs = jc.getClusterStatus(); } catch (Exception e) { e.printStackTrace(); throw e; } LOG.info( "Returning cluster status: " + cs.toString()); return cs; }
4)getSchema //返回表的schema信息
doAuthorization/doAuthorizationV2/getHivePrivObjects
用来在开启权限验证情况下对sql的权限检测操作
getLockObjects/acquireReadWriteLocks/releaseLocks
都是和锁相关的方法 ,其中getLockObjects用来获取锁的对象(锁的路径,锁的模式等),最终返回一个包含所有锁的list,acquireReadWriteLocks用来控制获取锁,releaseLocks用来释放锁:
getLockObjects: private List<HiveLockObj> getLockObjects(Database d, Table t, Partition p, HiveLockMode mode) throws SemanticException { List<HiveLockObj> locks = new LinkedList<HiveLockObj>(); HiveLockObjectData lockData = new HiveLockObjectData( plan.getQueryId(), String. valueOf(System.currentTimeMillis ()), "IMPLICIT", plan.getQueryStr()); if (d != null) { locks.add( new HiveLockObj(new HiveLockObject(d.getName(), lockData), mode)); //数据库层面的锁 return locks; } if (t != null) { // 表层面的锁 locks.add( new HiveLockObj(new HiveLockObject(t.getDbName(), lockData), mode)); locks.add( new HiveLockObj(new HiveLockObject(t, lockData), mode)); mode = HiveLockMode.SHARED; locks.add( new HiveLockObj(new HiveLockObject(t.getDbName(), lockData), mode)); return locks; } if (p != null) { //分区层面的锁 locks.add( new HiveLockObj(new HiveLockObject(p.getTable().getDbName(), lockData), mode)); if (!(p instanceof DummyPartition)) { locks.add( new HiveLockObj(new HiveLockObject(p, lockData), mode)); } // All the parents are locked in shared mode mode = HiveLockMode.SHARED; // For dummy partitions, only partition name is needed String name = p.getName(); if (p instanceof DummyPartition) { name = p.getName().split( "@")[2]; } String partialName = ""; String[] partns = name.split( "/"); int len = p instanceof DummyPartition ? partns.length : partns.length - 1; Map<String, String> partialSpec = new LinkedHashMap<String, String>(); for ( int idx = 0; idx < len; idx++) { String partn = partns[idx]; partialName += partn; String[] nameValue = partn.split( "="); assert(nameValue.length == 2); partialSpec.put(nameValue[0], nameValue[1]); try { locks.add( new HiveLockObj( new HiveLockObject(new DummyPartition(p.getTable(), p.getTable().getDbName() + "/" + p.getTable().getTableName() + "/" + partialName, partialSpec), lockData), mode)); partialName += "/"; } catch (HiveException e) { throw new SemanticException(e.getMessage()); } } locks.add( new HiveLockObj(new HiveLockObject(p.getTable(), lockData), mode)); locks.add( new HiveLockObj(new HiveLockObject(p.getTable().getDbName(), lockData), mode)); } return locks; }
acquireReadWriteLocks调用了锁具体实现类的acquireLocks方法
releaseLocks调用了锁具体实现类的releaseLocks方法
run方法是Driver类的入口方法,调用了runInternal方法,我们主要来看runInternal的方法,大体步骤:
运行hive.exec.driver.run.hooks中设置的hook, 运行HiveDriverRunHook相关类的的preDriverRun方法---->检测是否支持并发,并获取并发实现的类 --->compileInternal---->运行锁相关的操作(判断是否只对mapred job进行锁,获取锁等) ---->调用execute---->释放锁--->运行HiveDriverRunHook相关类的的postDriverRun方法 ---->返回CommandProcessorResponse对象
相关代码:
private CommandProcessorResponse runInternal(String command, boolean alreadyCompiled) throws CommandNeedRetryException { errorMessage = null; SQLState = null; downstreamError = null; if (!validateConfVariables()) { return new CommandProcessorResponse(12, errorMessage , SQLState ); } HiveDriverRunHookContext hookContext = new HiveDriverRunHookContextImpl(conf , command); // Get all the driver run hooks and pre-execute them. List<HiveDriverRunHook> driverRunHooks; try { //运行hive.exec.driver.run.hooks中设置的hook driverRunHooks = getHooks(HiveConf.ConfVars.HIVE_DRIVER_RUN_HOOKS, HiveDriverRunHook. class); for (HiveDriverRunHook driverRunHook : driverRunHooks) { driverRunHook.preDriverRun(hookContext); //运行HiveDriverRunHook相关类的的preDriverRun方法 } } catch (Exception e) { errorMessage = "FAILED: Hive Internal Error: " + Utilities.getNameMessage(e); SQLState = ErrorMsg. findSQLState(e.getMessage()); downstreamError = e; console.printError( errorMessage + "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e)); return new CommandProcessorResponse(12, errorMessage , SQLState ); } // Reset the perf logger PerfLogger perfLogger = PerfLogger.getPerfLogger( true); perfLogger.PerfLogBegin( CLASS_NAME, PerfLogger.DRIVER_RUN); perfLogger.PerfLogBegin( CLASS_NAME, PerfLogger.TIME_TO_SUBMIT); int ret; boolean requireLock = false; boolean ckLock = false; try { ckLock = checkConcurrency(); //检测是否支持并发,并获取并发实现的类,比如常用的 org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager createTxnManager(); } catch (SemanticException e) { errorMessage = "FAILED: Error in semantic analysis: " + e.getMessage(); SQLState = ErrorMsg. findSQLState(e.getMessage()); downstreamError = e; console.printError( errorMessage, "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e)); ret = 10; return new CommandProcessorResponse(ret, errorMessage , SQLState ); } ret = recordValidTxns(); if (ret != 0) return new CommandProcessorResponse(ret, errorMessage, SQLState); if (!alreadyCompiled) { ret = compileInternal(command); //调用compileInternal方法 if (ret != 0) { return new CommandProcessorResponse(ret, errorMessage, SQLState); } } // the reason that we set the txn manager for the cxt here is because each // query has its own ctx object. The txn mgr is shared across the // same instance of Driver, which can run multiple queries. ctx.setHiveTxnManager( txnMgr); if (ckLock) { //断是否只对mapred job进行锁,参数hive.lock.mapred.only.operation,默认为false boolean lockOnlyMapred = HiveConf.getBoolVar( conf, HiveConf.ConfVars.HIVE_LOCK_MAPRED_ONLY); if(lockOnlyMapred) { Queue<Task<? extends Serializable>> taskQueue = new LinkedList<Task<? extends Serializable>>(); taskQueue.addAll( plan.getRootTasks()); while (taskQueue.peek() != null) { Task<? extends Serializable> tsk = taskQueue.remove(); requireLock = requireLock || tsk.requireLock(); if(requireLock) { break; } if (tsk instanceof ConditionalTask) { taskQueue.addAll(((ConditionalTask)tsk).getListTasks()); } if(tsk.getChildTasks()!= null) { taskQueue.addAll(tsk.getChildTasks()); } // does not add back up task here, because back up task should be the same // type of the original task. } } else { requireLock = true; } } if (requireLock) { //获取锁 ret = acquireReadWriteLocks(); if (ret != 0) { try { releaseLocks( ctx.getHiveLocks()); } catch (LockException e) { // Not much to do here } return new CommandProcessorResponse(ret, errorMessage, SQLState); } } ret = execute(); //job运行 if (ret != 0) { //if needRequireLock is false, the release here will do nothing because there is no lock try { releaseLocks( ctx.getHiveLocks()); } catch (LockException e) { // Nothing to do here } return new CommandProcessorResponse(ret, errorMessage , SQLState ); } //if needRequireLock is false, the release here will do nothing because there is no lock try { releaseLocks( ctx.getHiveLocks()); } catch (LockException e) { errorMessage = "FAILED: Hive Internal Error: " + Utilities.getNameMessage(e); SQLState = ErrorMsg. findSQLState(e.getMessage()); downstreamError = e; console.printError( errorMessage + "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e)); return new CommandProcessorResponse(12, errorMessage , SQLState ); } perfLogger.PerfLogEnd( CLASS_NAME, PerfLogger.DRIVER_RUN); perfLogger.close(LOG, plan); // Take all the driver run hooks and post-execute them. try { for (HiveDriverRunHook driverRunHook : driverRunHooks) { //运行HiveDriverRunHook相关类的的postDriverRun方法 driverRunHook.postDriverRun(hookContext); } } catch (Exception e) { errorMessage = "FAILED: Hive Internal Error: " + Utilities.getNameMessage(e); SQLState = ErrorMsg. findSQLState(e.getMessage()); downstreamError = e; console.printError( errorMessage + "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e)); return new CommandProcessorResponse(12, errorMessage , SQLState ); } return new CommandProcessorResponse(ret); }
8)
再来看下compileInternal方法 private static final Object compileMonitor = new Object(); private int compileInternal(String command) { int ret; synchronized ( compileMonitor) { ret = compile(command); //调用compile方法 } if (ret != 0) { try { releaseLocks( ctx.getHiveLocks()); } catch (LockException e) { LOG.warn("Exception in releasing locks. " + org.apache.hadoop.util.StringUtils.stringifyException(e)); } } return ret; }
调用了compile方法,compile方法分析命令,生成Task,关于compile的具体实现后面详细讲解
9.execute方法,提交task并等待task运行完毕,并打印task运行的信息,比如消耗的时间等
(这里信息也比较多,后面单独讲解