By yhluo 2015年7月29日 Impala 3 Comments
Impala 源代码目录结构
SQL 解析
Impala 的 SQL 解析与执行计划生成部分是由 impala-frontend(Java)实现的,监听端口是 21000。用户通过
Beeswax 接口 BeeswaxService.query() 提交一个请求,在 impalad 端的处理逻辑是由
void ImpalaServer::query(QueryHandle& query_handle, const Query& query) 这个函数(
ImpalaServer.h)完成的。
1 |
void ImpalaServer::query(QueryHandle& query_handle, const Query& query) { |
2 |
VLOG_QUERY << "query(): query=" << query.query; |
3 |
ScopedSessionState session_handle( this ); |
4 |
shared_ptr<SessionState> session; |
5 |
RAISE_IF_ERROR( // 为当前连接返回唯一标识,标记会话为使用中并保存 |
6 |
session_handle.WithSession(ThriftServer::GetThreadConnectionId(), &session), |
7 |
SQLSTATE_GENERAL_ERROR); |
9 |
// 将 Query 转化为 TQueryCtx |
10 |
// raise general error for request conversion error; |
11 |
RAISE_IF_ERROR(QueryToTQueryContext(query, &query_ctx), SQLSTATE_GENERAL_ERROR); |
13 |
// raise Syntax error or access violation; it‘s likely to be syntax/analysis error |
14 |
// TODO: that may not be true; fix this |
15 |
shared_ptr<QueryExecState> exec_state; |
16 |
// 开始异步执行查询,内部调用 ImpalaServer::Execute() 函数 |
17 |
// 将 TQueryCtx 转换为 QueryExecState,注册并调用 Coordinator::Execute() |
18 |
RAISE_IF_ERROR(Execute(&query_ctx, session, &exec_state), |
19 |
SQLSTATE_SYNTAX_ERROR_OR_ACCESS_VIOLATION); |
21 |
exec_state->UpdateQueryState(QueryState::RUNNING); |
22 |
// start thread to wait for results to become available, which will allow |
23 |
// us to advance query state to FINISHED or EXCEPTION |
24 |
exec_state->WaitAsync(); |
25 |
// Once the query is running do a final check for session closure and add it to the |
26 |
// set of in-flight queries. |
27 |
Status status = SetQueryInflight(session, exec_state); |
29 |
UnregisterQuery(exec_state->query_id(), false , &status); |
30 |
RaiseBeeswaxException(status.GetDetail(), SQLSTATE_GENERAL_ERROR); |
32 |
TUniqueIdToQueryHandle(exec_state->query_id(), &query_handle); |
其中 QueryToTQueryContext(query, &query_ctx) 将 Query 装换为 TQueryCtx。具体代码实现如下:
(ImpalaServer.h)
1 |
Status ImpalaServer::QueryToTQueryContext( const Query& query, |
2 |
TQueryCtx* query_ctx) { |
3 |
query_ctx->request.stmt = query.query; |
4 |
VLOG_QUERY << "query: " << ThriftDebugString(query); |
6 |
shared_ptr<SessionState> session; |
7 |
const TUniqueId& session_id = ThriftServer::GetThreadConnectionId(); |
8 |
RETURN_IF_ERROR(GetSessionState(session_id, &session)); |
9 |
DCHECK(session != NULL); |
11 |
// The session is created when the client connects. Depending on the underlying |
12 |
// transport, the username may be known at that time. If the username hasn‘t been |
13 |
// set yet, set it now. |
14 |
lock_guard<mutex> l(session->lock); |
15 |
if (session->connected_user.empty()) session->connected_user = query.hadoop_user; |
16 |
query_ctx->request.query_options = session->default_query_options; |
18 |
// 构建该 SessionState 的 Thrift 表示用于序列化到 frontend |
19 |
session->ToThrift(session_id, &query_ctx->session); |
22 |
// Override default query options with Query.Configuration |
23 |
if (query.__isset.configuration) { |
24 |
BOOST_FOREACH( const string& option, query.configuration) { |
25 |
RETURN_IF_ERROR(ParseQueryOptions(option, &query_ctx->request.query_options)); |
27 |
VLOG_QUERY << "TClientRequest.queryOptions: " |
28 |
<< ThriftDebugString(query_ctx->request.query_options); |
内部调用 ImpalaServer::Execute()
(ImpalaServer.h)
函数将 TQueryCtx 转换为 TExecRequest,具体逻辑通过调用 ImpalaServer::ExecuteInternal() 实现。代码如下:
1 |
Status ImpalaServer::Execute(TQueryCtx* query_ctx, |
2 |
shared_ptr<SessionState> session_state, |
3 |
shared_ptr<QueryExecState>* exec_state) { |
4 |
PrepareQueryContext(query_ctx); |
5 |
bool registered_exec_state; |
6 |
ImpaladMetrics::IMPALA_SERVER_NUM_QUERIES->Increment(1L); |
8 |
// Redact the SQL stmt and update the query context |
9 |
string stmt = replace_all_copy(query_ctx->request.stmt, "\n" , " " ); |
11 |
query_ctx->request.__set_redacted_stmt(( const string) stmt); |
12 |
// 实现 Execute() 逻辑,出错时不取消注册查询 |
13 |
Status status = ExecuteInternal(*query_ctx, session_state, ®istered_exec_state, |
15 |
if (!status.ok() && registered_exec_state) { |
16 |
UnregisterQuery((*exec_state)->query_id(), false , &status); |
上面的函数调用 ImpalaServer::ExecuteInternal()
(ImpalaServer.h)
在这个函数里通过 JNI 接口调用 frontend.createExecRequest() 生成 TExecRequest,具体代码如下:
1 |
Status ImpalaServer::ExecuteInternal( |
2 |
const TQueryCtx& query_ctx, |
3 |
shared_ptr<SessionState> session_state, |
4 |
bool* registered_exec_state, |
5 |
shared_ptr<QueryExecState>* exec_state) { |
6 |
DCHECK(session_state != NULL); |
7 |
*registered_exec_state = false ; |
9 |
return Status( "This Impala server is offline. Please retry your query later." ); |
11 |
exec_state->reset( new QueryExecState(query_ctx, exec_env_, exec_env_->frontend(), |
12 |
this , session_state)); |
14 |
(*exec_state)->query_events()->MarkEvent( "Start execution" ); |
18 |
// Keep a lock on exec_state so that registration and setting |
19 |
// result_metadata are atomic. |
21 |
// Note: this acquires the exec_state lock *before* the |
22 |
// query_exec_state_map_ lock. This is the opposite of |
23 |
// GetQueryExecState(..., true), and therefore looks like a |
24 |
// candidate for deadlock. The reason this works here is that |
25 |
// GetQueryExecState cannot find exec_state (under the exec state |
26 |
// map lock) and take it‘s lock until RegisterQuery has |
27 |
// finished. By that point, the exec state map lock will have been |
28 |
// given up, so the classic deadlock interleaving is not possible. |
29 |
lock_guard<mutex> l(*(*exec_state)->lock()); |
31 |
// register exec state as early as possible so that queries that |
32 |
// take a long time to plan show up, and to handle incoming status |
33 |
// reports before execution starts. |
34 |
RETURN_IF_ERROR(RegisterQuery(session_state, *exec_state)); |
35 |
*registered_exec_state = true ; |
37 |
RETURN_IF_ERROR((*exec_state)->UpdateQueryStatus( |
38 |
// 通过 JNI 接口调用 frontend.createExecRequest() 生成 TExecRequest |
39 |
exec_env_->frontend()->GetExecRequest(query_ctx, &result))); |
40 |
(*exec_state)->query_events()->MarkEvent( "Planning finished" ); |
41 |
(*exec_state)->summary_profile()->AddEventSequence( |
42 |
result.timeline.name, result.timeline); |
43 |
if (result.__isset.result_set_metadata) { |
44 |
(*exec_state)->set_result_metadata(result.result_set_metadata); |
47 |
VLOG( 2 ) << "Execution request: " << ThriftDebugString(result); |
49 |
// start execution of query; also starts fragment status reports |
50 |
RETURN_IF_ERROR((*exec_state)->Exec(&result)); |
51 |
if (result.stmt_type == TStmtType::DDL) { |
52 |
Status status = UpdateCatalogMetrics(); |
54 |
VLOG_QUERY << "Couldn‘t update catalog metrics: " << status.GetDetail(); |
58 |
if ((*exec_state)->coord() != NULL) { |
59 |
const unordered_set<TNetworkAddress>& unique_hosts = |
60 |
(*exec_state)->schedule()->unique_hosts(); |
61 |
if (!unique_hosts.empty()) { |
62 |
lock_guard<mutex> l(query_locations_lock_); |
63 |
BOOST_FOREACH( const TNetworkAddress& port, unique_hosts) { |
64 |
query_locations_[port].insert((*exec_state)->query_id()); |
Frontend::GetExecRequest()
(Frontend.h)
通过 JNI 接口调用 frontend.createExecRequest() 生成 TExecRequest。具体实现代码如下:
1 |
Status Frontend::GetExecRequest( |
2 |
const TQueryCtx& query_ctx, TExecRequest* result) { |
3 |
return JniUtil::CallJniMethod(fe_, create_exec_request_id_, query_ctx, result); |
JniUtil::CallJniMethod()
(jni-util.h)
的具体实现代码如下:
1 |
/// Utility methods to avoid repeating lots of the JNI call boilerplate. It seems these |
2 |
/// must be defined in the header to compile properly. |
4 |
static Status CallJniMethod( const jobject& obj, const jmethodID& method, const T& arg) { |
5 |
JNIEnv* jni_env = getJNIEnv(); |
6 |
jbyteArray request_bytes; |
7 |
JniLocalFrame jni_frame; |
8 |
RETURN_IF_ERROR(jni_frame.push(jni_env)); |
9 |
RETURN_IF_ERROR(SerializeThriftMsg(jni_env, &arg, &request_bytes)); |
10 |
jni_env->CallObjectMethod(obj, method, request_bytes); |
11 |
RETURN_ERROR_IF_EXC(jni_env); |
至此,将通过 Thrift 转到 Java Frontend 生成执行计划树。
public TExecRequest createExecRequest(TQueryCtx queryCtx, StringBuilder explainString)
(Frontend.java)
是最重要的方法,它根据提供的 TQueryCtx 创建 TExecRequest。具体代码(分析部分)如下:
2 |
* Create a populated TExecRequest corresponding to the supplied TQueryCtx. |
4 |
public TExecRequest createExecRequest(TQueryCtx queryCtx, StringBuilder explainString) |
5 |
throws ImpalaException { |
6 |
// Analyzes the SQL statement included in queryCtx and returns the AnalysisResult. |
7 |
AnalysisContext.AnalysisResult analysisResult = analyzeStmt(queryCtx); |
8 |
EventSequence timeline = analysisResult.getAnalyzer().getTimeline(); |
9 |
timeline.markEvent( "Analysis finished" ); |
首先通过调用 analyzeStmt()
(Frontend.java)
方法分析提交的 SQL 语句。analyzeStmt() 的具体实现代码如下:
2 |
* Analyzes the SQL statement included in queryCtx and returns the AnalysisResult. |
4 |
private AnalysisContext.AnalysisResult analyzeStmt(TQueryCtx queryCtx) |
5 |
throws AnalysisException, InternalException, AuthorizationException { |
6 |
AnalysisContext analysisCtx = new AnalysisContext(dsqldCatalog_, queryCtx, |
8 |
LOG.debug( "analyze query " + queryCtx.request.stmt); |
12 |
// 2) 由于缺失表分析失败并抛出 AnalysisException 异常 |
13 |
// 3) 分析失败并抛出 AuthorizationException 异常 |
17 |
// 通过调用 AnalyzeContex.analyze() 实现具体的分析逻辑 |
18 |
analysisCtx.analyze(queryCtx.request.stmt); |
19 |
Preconditions.checkState(analysisCtx.getAnalyzer().getMissingTbls().isEmpty()); |
20 |
return analysisCtx.getAnalysisResult(); |
21 |
} catch (AnalysisException e) { |
22 |
Set<TableName> missingTbls = analysisCtx.getAnalyzer().getMissingTbls(); |
23 |
// Only re-throw the AnalysisException if there were no missing tables. |
24 |
if (missingTbls.isEmpty()) throw e; |
26 |
// Some tables/views were missing, request and wait for them to load. |
27 |
if (!requestTblLoadAndWait(missingTbls, MISSING_TBL_LOAD_WAIT_TIMEOUT_MS)) { |
28 |
LOG.info(String.format( "Missing tables were not received in %dms. Load " + |
29 |
"request will be retried." , MISSING_TBL_LOAD_WAIT_TIMEOUT_MS)); |
34 |
// Authorize all accesses. |
35 |
// AuthorizationExceptions must take precedence over any AnalysisException |
36 |
// that has been thrown, so perform the authorization first. |
37 |
analysisCtx.getAnalyzer().authorize(getAuthzChecker()); |
AnalyzerContext.AnalyzeResult.Analyzer 对象是个存放这个 SQL 所涉及到的所有信息
(包含Table, conjunct, slot,slotRefMap, eqJoinConjuncts等)的知识库,所有跟这个
SQL 有关的东西都会存到 Analyzer对象里面。该类的定义可以查看
Analyzer.java
AnalyzerContex.analyze()
(AnalyzeContext.java)
的具体实现代码如下:
2 |
* Parse and analyze ‘stmt‘. If ‘stmt‘ is a nested query (i.e. query that |
3 |
* contains subqueries), it is also rewritten by performing subquery unnesting. |
4 |
* The transformed stmt is then re-analyzed in a new analysis context. |
6 |
public void analyze(String stmt) throws AnalysisException { |
7 |
Analyzer analyzer = new Analyzer(catalog_, queryCtx_, authzConfig_); |
8 |
analyze(stmt, analyzer); |
上面的 analyze() 函数通过调用同名的重载函数 analyze(String stmt, Analyzer analyzer)
(AnalyzeContext.java)
实现具体的分析,代码如下:
2 |
* Parse and analyze ‘stmt‘ using a specified Analyzer. |
4 |
public void analyze(String stmt, Analyzer analyzer) throws AnalysisException { |
5 |
SqlScanner input = new SqlScanner( new StringReader(stmt)); |
6 |
SqlParser parser = new SqlParser(input); |
8 |
analysisResult_ = new AnalysisResult(); |
9 |
analysisResult_.analyzer_ = analyzer; |
10 |
if (analysisResult_.analyzer_ == null ) { |
11 |
analysisResult_.analyzer_ = new Analyzer(catalog_, queryCtx_, authzConfig_); |
13 |
analysisResult_.stmt_ = (StatementBase) parser.parse().value; |
14 |
if (analysisResult_.stmt_ == null ) |
17 |
// For CTAS(Create Table As Select), we copy the create statement |
18 |
// in case we have to create a new CTAS statement after a query rewrite. |
19 |
if (analysisResult_.stmt_ instanceof CreateTableAsSelectStmt) { |
20 |
analysisResult_.tmpCreateTableStmt_ = |
21 |
((CreateTableAsSelectStmt) analysisResult_.stmt_).getCreateStmt().clone(); |
24 |
analysisResult_.stmt_.analyze(analysisResult_.analyzer_); |
25 |
boolean isExplain = analysisResult_.isExplainStmt(); |
27 |
// Check if we need to rewrite the statement. |
28 |
if (analysisResult_.requiresRewrite()) { |
29 |
StatementBase rewrittenStmt = StmtRewriter.rewrite(analysisResult_); |
30 |
// Re-analyze the rewritten statement. |
31 |
Preconditions.checkNotNull(rewrittenStmt); |
32 |
analysisResult_ = new AnalysisResult(); |
33 |
analysisResult_.analyzer_ = new Analyzer(catalog_, queryCtx_, authzConfig_); |
34 |
analysisResult_.stmt_ = rewrittenStmt; |
35 |
analysisResult_.stmt_.analyze(analysisResult_.analyzer_); |
36 |
LOG.trace( "rewrittenStmt: " + rewrittenStmt.toSql()); |
38 |
analysisResult_.stmt_.setIsExplain(); |
40 |
} catch (AnalysisException e) { |
41 |
// Don‘t wrap AnalysisExceptions in another AnalysisException |
43 |
} catch (Exception e) { |
44 |
throw new AnalysisException(parser.getErrorMsg(stmt), e); |
上面的函数通过调用 SqlScanner 和 SqlParser 类实现具体的分析。可以查看
sql-scanner.flex
和
sql-parser.y
分析 SQL 语句的大概流程如下:
- 处理这个 SQL 所涉及到的 Table(即TableRefs),这些 Table 是在 from 从句中提取出来的(包含关键字
from, join, on/using)。注意 JOIN 操作以及 on/using 条件是存储在参与 JOIN 操作的右边的表的 TableRef
中并分析的。依次 analyze() 每个 TableRef,向 Analyzer 注册 registerBaseTableRef(填充TupleDescriptor)。
如果对应的 TableRef 涉及到 JOIN 操作,还要 analyzeJoin()。在 analyzeJoin() 时会向 Analyzer registerConjunct()
填充 Analyzer 的一些成员变量:conjuncts,tuplePredicates(TupleId 与 conjunct 的映射),slotPredicates(SlotId
与 conjunct 的映射),eqJoinConjuncts。
- 处理 select 从句(包含关键字 select, MAX(), AVG()等聚集函数):分析这个 SQL 都 select 了哪几项,每一项都是个
Expr 类型的子类对象,把这几项填入 resultExprs 数组和 colLabels。然后把 resultExprs 里面的 Expr 都递归 analyze
一下,要分析到树的最底层,向 Analyzer 注册 SlotRef 等。
- 分析 where 从句(关键字 where),首先递归 Analyze 从句中 Expr 组成的树,然后向 Analyzer registerConjunct()
填充 Analyzer 的一些成员变量(同1,此外还要填充 whereClauseConjuncts) 。
- 处理 sort 相关信息(关键字 order by)。先是解析 aliases 和 ordinals,然后从 order by 后面的从句中提取 Expr 填入
orderingExprs,接着递归 Analyze 从句中 Expr 组成的树,最后创建 SortInfo 对象。
- 处理 aggregation 相关信息(关键字 group by, having, avg, max 等)。首先递归分析 group by 从句里的 Expr,然后如果有
having 从句就像 where 从句一样,先是 analyze having 从句中 Expr 组成的树,然后向 Analyzer registerConjunct()等。
- 处理 InlineView。
至此,词法分析和语法分析都完成了,回到 frontend.createExecRequest()
(Frontend.java)
函数,开始填充 TExecRequest 内的成员变量。代码如下(部分):
2 |
* Create a populated TExecRequest corresponding to the supplied TQueryCtx. |
4 |
public TExecRequest createExecRequest(TQueryCtx queryCtx, StringBuilder explainString) |
5 |
throws ImpalaException { |
6 |
// Analyzes the SQL statement included in queryCtx and returns the AnalysisResult. |
7 |
AnalysisContext.AnalysisResult analysisResult = analyzeStmt(queryCtx); |
8 |
EventSequence timeline = analysisResult.getAnalyzer().getTimeline(); |
9 |
timeline.markEvent( "Analysis finished" ); |
12 |
Preconditions.checkNotNull(analysisResult.getStmt()); |
13 |
TExecRequest result = new TExecRequest(); |
14 |
result.setQuery_options(queryCtx.request.getQuery_options()); |
15 |
result.setAccess_events(analysisResult.getAccessEvents()); |
16 |
result.analysis_warnings = analysisResult.getAnalyzer().getWarnings(); |
18 |
if (analysisResult.isCatalogOp()) { |
19 |
result.stmt_type = TStmtType.DDL; |
20 |
createCatalogOpRequest(analysisResult, result); |
21 |
String jsonLineageGraph = analysisResult.getJsonLineageGraph(); |
22 |
if (jsonLineageGraph != null && !jsonLineageGraph.isEmpty()) { |
23 |
result.catalog_op_request.setLineage_graph(jsonLineageGraph); |
25 |
// All DDL operations except for CTAS are done with analysis at this point. |
26 |
if (!analysisResult.isCreateTableAsSelectStmt()) return result; |
27 |
} else if (analysisResult.isLoadDataStmt()) { |
28 |
result.stmt_type = TStmtType.LOAD; |
29 |
result.setResult_set_metadata( new TResultSetMetadata(Arrays.asList( |
30 |
new TColumn( "summary" , Type.STRING.toThrift())))); |
31 |
result.setLoad_data_request(analysisResult.getLoadDataStmt().toThrift()); |
33 |
} else if (analysisResult.isSetStmt()) { |
34 |
result.stmt_type = TStmtType.SET; |
35 |
result.setResult_set_metadata( new TResultSetMetadata(Arrays.asList( |
36 |
new TColumn( "option" , Type.STRING.toThrift()), |
37 |
new TColumn( "value" , Type.STRING.toThrift())))); |
38 |
result.setSet_query_option_request(analysisResult.getSetStmt().toThrift()); |
如果是 DDL 命令(use, show tables, show databases, describe),那么调用 createCatalogOpRequest()。
如果是 Load Data 或者 Set 语句,就调用相应的 setmetadata 并转换为 Thrift。
执行计划生成
另外一种情况就是 Query 或者 DML 命令,那么就得创建和填充 TQueryExecRequest 了。该部分代码如下:
2 |
* Create a populated TExecRequest corresponding to the supplied TQueryCtx. |
4 |
public TExecRequest createExecRequest(TQueryCtx queryCtx, StringBuilder explainString) |
11 |
// create TQueryExecRequest 如果是 Query、DML、或 CTAS 语句 |
12 |
Preconditions.checkState(analysisResult.isQueryStmt() || analysisResult.isDmlStmt() |
13 |
|| analysisResult.isCreateTableAsSelectStmt()); |
15 |
TQueryExecRequest queryExecRequest = new TQueryExecRequest(); |
17 |
LOG.debug( "create plan" ); |
18 |
Planner planner = new Planner(analysisResult, queryCtx); |
19 |
// 根据 SQL 语法树生成执行计划(PlanNode 和 PlanFragment) |
20 |
// 用 Planner 把 SQL 解析出的语法树转换成 Plan fragments,后者能在各个 backend 被执行。 |
21 |
ArrayList<PlanFragment> fragments = planner.createPlan(); |
23 |
List<ScanNode> scanNodes = Lists.newArrayList(); |
24 |
// 建立 queryExecRequest.fragments 中 fragment 到它索引的映射; |
25 |
// queryExecRequest.dest_fragment_idx 需要这些映射 |
26 |
Map<PlanFragment, Integer> fragmentIdx = Maps.newHashMap(); |
28 |
for ( int fragmentId = 0 ; fragmentId < fragments.size(); ++fragmentId) { |
29 |
PlanFragment fragment = fragments.get(fragmentId); |
30 |
Preconditions.checkNotNull(fragment.getPlanRoot()); |
31 |
fragment.getPlanRoot().collect(Predicates.instanceOf(ScanNode. class ), scanNodes); |
32 |
fragmentIdx.put(fragment, fragmentId); |
上面的 createPlan() 函数是 frontend 最重要的函数:根据 SQL 解析的结果和 client 传入的 query options,
生成执行计划。执行计划是用 PlanFragment 的数组表示的,最后会序列化到 TQueryExecRequest.fragments
然后传给 backend 的 coordinator 去调度执行。现在让我们来看看 createPlan()
(Planner.java)
的具体实现:
2 |
* Returns a list of plan fragments for executing an analyzed parse tree. |
3 |
* May return a single-node or distributed executable plan. |
5 |
public ArrayList<PlanFragment> createPlan() throws ImpalaException { |
6 |
SingleNodePlanner singleNodePlanner = new SingleNodePlanner(ctx_); |
7 |
DistributedPlanner distributedPlanner = new DistributedPlanner(ctx_); |
8 |
// 首先生成 SingleNodePlan,单节点执行计划树 |
9 |
PlanNode singleNodePlan = singleNodePlanner.createSingleNodePlan(); |
10 |
ctx_.getRootAnalyzer().getTimeline().markEvent( "Single node plan created" ); |
11 |
ArrayList<PlanFragment> fragments = null ; |
13 |
// Determine the maximum number of rows processed by any node in the plan tree |
14 |
MaxRowsProcessedVisitor visitor = new MaxRowsProcessedVisitor(); |
15 |
singleNodePlan.accept(visitor); |
16 |
long maxRowsProcessed = visitor.get() == - 1 ? Long.MAX_VALUE : visitor.get(); |
17 |
boolean isSmallQuery = |
18 |
maxRowsProcessed < ctx_.getQueryOptions().exec_single_node_rows_threshold; |
20 |
// Execute on a single node and disable codegen for small results |
21 |
ctx_.getQueryOptions().setNum_nodes( 1 ); |
22 |
ctx_.getQueryOptions().setDisable_codegen( true ); |
23 |
if (maxRowsProcessed < ctx_.getQueryOptions().batch_size || |
24 |
maxRowsProcessed < 1024 && ctx_.getQueryOptions().batch_size == 0 ) { |
25 |
// Only one scanner thread for small queries |
26 |
ctx_.getQueryOptions().setNum_scanner_threads( 1 ); |
30 |
if (ctx_.isSingleNodeExec()) { // 如果是单节点执行计划树 |
32 |
fragments = Lists.newArrayList( new PlanFragment( |
33 |
ctx_.getNextFragmentId(), singleNodePlan, DataPartition.UNPARTITIONED)); |
35 |
// create distributed plan |
36 |
fragments = distributedPlanner.createPlanFragments(singleNodePlan); |
38 |
// 最后一个 Fragment 是根 fragment |
39 |
PlanFragment rootFragment = fragments.get(fragments.size() - 1 ); |
40 |
if (ctx_.isInsertOrCtas()) { |
41 |
InsertStmt insertStmt = ctx_.getAnalysisResult().getInsertStmt(); |
42 |
if (!ctx_.isSingleNodeExec()) { |
43 |
// repartition on partition keys |
44 |
rootFragment = distributedPlanner.createInsertFragment( |
45 |
rootFragment, insertStmt, ctx_.getRootAnalyzer(), fragments); |
47 |
// set up table sink for root fragment |
48 |
rootFragment.setSink(insertStmt.createDataSink()); |
51 |
ColumnLineageGraph graph = ctx_.getRootAnalyzer().getColumnLineageGraph(); |
52 |
List<Expr> resultExprs = null ; |
53 |
Table targetTable = null ; |
54 |
if (ctx_.isInsertOrCtas()) { |
55 |
InsertStmt insertStmt = ctx_.getAnalysisResult().getInsertStmt(); |
56 |
resultExprs = insertStmt.getResultExprs(); |
57 |
targetTable = insertStmt.getTargetTable(); |
58 |
graph.addTargetColumnLabels(targetTable); |
60 |
resultExprs = ctx_.getQueryStmt().getResultExprs(); |
61 |
graph.addTargetColumnLabels(ctx_.getQueryStmt().getColLabels()); |
63 |
resultExprs = Expr.substituteList(resultExprs, |
64 |
rootFragment.getPlanRoot().getOutputSmap(), ctx_.getRootAnalyzer(), true ); |
65 |
rootFragment.setOutputExprs(resultExprs); |
66 |
LOG.debug( "desctbl: " + ctx_.getRootAnalyzer().getDescTbl().debugString()); |
67 |
LOG.debug( "resultexprs: " + Expr.debugString(rootFragment.getOutputExprs())); |
68 |
LOG.debug( "finalize plan fragments" ); |
69 |
for (PlanFragment fragment: fragments) { |
70 |
fragment.finalize(ctx_.getRootAnalyzer()); |
73 |
Collections.reverse(fragments); |
74 |
ctx_.getRootAnalyzer().getTimeline().markEvent( "Distributed plan created" ); |
76 |
if (RuntimeEnv.INSTANCE.computeLineage() || RuntimeEnv.INSTANCE.isTestEnv()) { |
77 |
// Compute the column lineage graph |
78 |
if (ctx_.isInsertOrCtas()) { |
79 |
Preconditions.checkNotNull(targetTable); |
80 |
List<Expr> exprs = Lists.newArrayList(); |
81 |
if (targetTable instanceof HBaseTable) { |
82 |
exprs.addAll(resultExprs); |
84 |
exprs.addAll(ctx_.getAnalysisResult().getInsertStmt().getPartitionKeyExprs()); |
85 |
exprs.addAll(resultExprs.subList( 0 , |
86 |
targetTable.getNonClusteringColumns().size())); |
88 |
graph.computeLineageGraph(exprs, ctx_.getRootAnalyzer()); |
90 |
graph.computeLineageGraph(resultExprs, ctx_.getRootAnalyzer()); |
92 |
LOG.trace( "lineage: " + graph.debugString()); |
93 |
ctx_.getRootAnalyzer().getTimeline().markEvent( "Lineage info computed" ); |
createPlan 包括createSingleNodePlan 和 createPlanFragments
两个主要部分。其中第一个是单节点计划树,所有片段只能在一个节点 corrd 上执行,第二个是分布式执行计划树,片段可以分配到不同的节点中运行。我们先来看看 SingleNodePlanner.createSingleNodePlan()
(SingleNodePlanner.java)
该方法根据 Planner Context 中分析的语法树创建单节点执行计划树并返回根节点。计划递归处理语法树并执行以下操作,自上而下处理查询语句:
- materialize the slots required for evaluating expressions of that statement
- migrate conjuncts from parent blocks into inline views and union operands In the bottom-up phase generate the plan tree for every query statement:
- perform join-order optimization when generating the plan of the FROM clause of a select statement; requires that all materialized slots are known for an accurate estimate of row sizes needed for cost-based join ordering
- assign conjuncts that can be evaluated at that node and compute the stats of that node (cardinality, etc.)
- apply combined expression substitution map of child plan nodes; if a plan node re-maps its input, set a substitution map to be applied by parents
具体代码如下:
2 |
* Generates and returns the root of the single-node plan for the analyzed parse tree |
3 |
* in the planner context. |
5 |
public PlanNode createSingleNodePlan() throws ImpalaException { |
6 |
QueryStmt queryStmt = ctx_.getQueryStmt(); |
7 |
// Use the stmt‘s analyzer which is not necessarily the root analyzer |
8 |
// to detect empty result sets. |
9 |
Analyzer analyzer = queryStmt.getAnalyzer(); |
10 |
analyzer.computeEquivClasses(); |
11 |
analyzer.getTimeline().markEvent( "Equivalence classes computed" ); |
13 |
// Mark slots referenced by output exprs as materialized, prior to generating the |
15 |
// We need to mark the result exprs of the topmost select block as materialized, so |
16 |
// that PlanNode.init() can compute the final mem layout of materialized tuples |
17 |
// (the byte size of tuples is needed for cost computations). |
18 |
// TODO: instead of materializing everything produced by the plan root, derive |
19 |
// referenced slots from destination fragment and add a materialization node |
20 |
// if not all output is needed by destination fragment |
21 |
// TODO 2: should the materialization decision be cost-based? |
22 |
if (queryStmt.getBaseTblResultExprs() != null ) { |
23 |
analyzer.materializeSlots(queryStmt.getBaseTblResultExprs()); |
26 |
LOG.trace( "desctbl: " + analyzer.getDescTbl().debugString()); |
27 |
PlanNode singleNodePlan = createQueryPlan(queryStmt, analyzer, |
28 |
ctx_.getQueryOptions().isDisable_outermost_topn()); |
29 |
Preconditions.checkNotNull(singleNodePlan); |
30 |
return singleNodePlan; |
上面的函数通过调用私有的 createQueryPlan()
(SingleNodePlanner.java)
函数实现。该函数为单节点执行创建计划树。为查询语句中的
Select/Project/Join/Union [All]/Group by/Having/Order by
生成 PlanNode。具体实现代码如下:
2 |
* Create plan tree for single-node execution. Generates PlanNodes for the |
3 |
* Select/Project/Join/Union [All]/Group by/Having/Order by clauses of the query stmt. |
5 |
private PlanNode createQueryPlan(QueryStmt stmt, Analyzer analyzer, boolean disableTopN) |
6 |
throws ImpalaException { |
7 |
// Analyzer 检测结果集是否为空,如果是的话直接返回空节点 |
8 |
if (analyzer.hasEmptyResultSet()) return createEmptyNode(stmt, analyzer); |
11 |
if (stmt instanceof SelectStmt) { // 如果是 select 语句 |
12 |
SelectStmt selectStmt = (SelectStmt) stmt; |
14 |
root = createSelectPlan(selectStmt, analyzer); |
16 |
// insert possible AnalyticEvalNode before SortNode |
17 |
if (((SelectStmt) stmt).getAnalyticInfo() != null ) { |
18 |
AnalyticInfo analyticInfo = selectStmt.getAnalyticInfo(); |
19 |
ArrayList<TupleId> stmtTupleIds = Lists.newArrayList(); |
20 |
stmt.getMaterializedTupleIds(stmtTupleIds); |
21 |
AnalyticPlanner analyticPlanner = |
22 |
new AnalyticPlanner(stmtTupleIds, analyticInfo, analyzer, ctx_); |
23 |
List<Expr> inputPartitionExprs = Lists.newArrayList(); |
24 |
AggregateInfo aggInfo = selectStmt.getAggInfo(); |
25 |
root = analyticPlanner.createSingleNodePlan(root, |
26 |
aggInfo != null ? aggInfo.getGroupingExprs() : null , inputPartitionExprs); |
27 |
if (aggInfo != null && !inputPartitionExprs.isEmpty()) { |
28 |
// analytic computation will benefit from a partition on inputPartitionExprs |
29 |
aggInfo.setPartitionExprs(inputPartitionExprs); |
32 |
} else { // 否则,创建 UnionPlan |
33 |
Preconditions.checkState(stmt instanceof UnionStmt); |
34 |
root = createUnionPlan((UnionStmt) stmt, analyzer); |
37 |
// 如果 sort 元组有没有物化的槽,避免添加 sort node, |
38 |
boolean sortHasMaterializedSlots = false ; |
39 |
if (stmt.evaluateOrderBy()) { |
40 |
for (SlotDescriptor sortSlotDesc: |
41 |
stmt.getSortInfo().getSortTupleDescriptor().getSlots()) { |
42 |
if (sortSlotDesc.isMaterialized()) { |
43 |
sortHasMaterializedSlots = true ; |
49 |
if (stmt.evaluateOrderBy() && sortHasMaterializedSlots) { |
50 |
long limit = stmt.getLimit(); |
51 |
// TODO: External sort could be used for very large limits |
52 |
// not just unlimited order-by |
53 |
boolean useTopN = stmt.hasLimit() && !disableTopN; |
55 |
root = new SortNode(ctx_.getNextNodeId(), root, stmt.getSortInfo(), |
56 |
useTopN, stmt.getOffset()); |
57 |
Preconditions.checkState(root.hasValidStats()); |
61 |
root.setLimit(stmt.getLimit()); |
62 |
root.computeStats(analyzer); |
SingleNodePlanner.createSelectPlan()
(SingleNodePlanner.java)
函数创建实现 select 查询语句块中
Select/Project/Join/Group by/Having 等从句的 PlanNode 树。具体实现代码如下:
2 |
* Create tree of PlanNodes that implements the Select/Project/Join/Group by/Having |
3 |
* of the selectStmt query block. |
5 |
private PlanNode createSelectPlan(SelectStmt selectStmt, Analyzer analyzer) |
6 |
throws ImpalaException { |
7 |
// no from clause -> materialize the select‘s exprs with a UnionNode |
8 |
// 如果 select 语句没有引用任何 table,创建 ConstantSelectPlan |
9 |
if (selectStmt.getTableRefs().isEmpty()) { |
10 |
return createConstantSelectPlan(selectStmt, analyzer); |
13 |
// Slot materialization: |
14 |
// We need to mark all slots as materialized that are needed during the execution |
15 |
// of selectStmt, and we need to do that prior to creating plans for the TableRefs |
16 |
// (because createTableRefNode() might end up calling computeMemLayout() on one or |
17 |
// more TupleDescriptors, at which point all referenced slots need to be marked). |
19 |
// For non-join predicates, slots are marked as follows: |
20 |
// - for base table scan predicates, this is done directly by ScanNode.init(), which |
21 |
// can do a better job because it doesn‘t need to materialize slots that are only |
22 |
// referenced for partition pruning, for instance |
23 |
// - for inline views, non-join predicates are pushed down, at which point the |
24 |
// process repeats itself. |
25 |
selectStmt.materializeRequiredSlots(analyzer); |
27 |
ArrayList<TupleId> rowTuples = Lists.newArrayList(); |
28 |
// collect output tuples of subtrees |
29 |
for (TableRef tblRef: selectStmt.getTableRefs()) { |
30 |
rowTuples.addAll(tblRef.getMaterializedTupleIds()); |
33 |
// 如果 select 语句中的 select、project、join 部分返回空结果集 |
34 |
// 用空集创建满足 select 语句的 AggregationPlan |
35 |
// Make sure the slots of the aggregation exprs and the tuples that they reference |
36 |
// are materialized (see IMPALA-1960). |
37 |
if (analyzer.hasEmptySpjResultSet()) { |
38 |
PlanNode emptySetNode = new EmptySetNode(ctx_.getNextNodeId(), rowTuples); |
39 |
emptySetNode.init(analyzer); |
40 |
emptySetNode.setOutputSmap(selectStmt.getBaseTblSmap()); |
41 |
return createAggregationPlan(selectStmt, analyzer, emptySetNode); |
44 |
// 为 table refs 创建 Plan;这里使用 list 而不是 map 是为了保证生成 join plan |
45 |
// 时遍历 TableRefs 有一个确定的顺序 |
46 |
List<Pair<TableRef, PlanNode>> refPlans = Lists.newArrayList(); |
47 |
for (TableRef ref: selectStmt.getTableRefs()) { |
48 |
PlanNode plan = createTableRefNode(analyzer, ref); |
49 |
Preconditions.checkState(plan != null ); |
50 |
refPlans.add( new Pair(ref, plan)); |
52 |
// save state of conjunct assignment; needed for join plan generation |
53 |
for (Pair<TableRef, PlanNode> entry: refPlans) { |
54 |
entry.second.setAssignedConjuncts(analyzer.getAssignedConjuncts()); |
58 |
// 如果有足够的统计数据,例如 join 操作各个 table 的大小,创建开销最小的 JoinPlan |
59 |
if (!selectStmt.getSelectList().isStraightJoin()) { |
60 |
Set<ExprId> assignedConjuncts = analyzer.getAssignedConjuncts(); |
61 |
root = createCheapestJoinPlan(analyzer, refPlans); |
62 |
if (root == null ) analyzer.setAssignedConjuncts(assignedConjuncts); |
64 |
// 否则,根据 from 从句中 table 顺序创建 JoinPlan |
65 |
if (selectStmt.getSelectList().isStraightJoin() || root == null ) { |
66 |
// we didn‘t have enough stats to do a cost-based join plan, or the STRAIGHT_JOIN |
67 |
// keyword was in the select list: use the FROM clause order instead |
68 |
root = createFromClauseJoinPlan(analyzer, refPlans); |
69 |
Preconditions.checkNotNull(root); |
72 |
// 如果有聚集操作,创建 AggregationPlan |
73 |
if (selectStmt.getAggInfo() != null ) { |
74 |
root = createAggregationPlan(selectStmt, analyzer, root); |
77 |
// All the conjuncts_ should be assigned at this point. |
78 |
// TODO: Re-enable this check here and/or elswehere. |
79 |
//Preconditions.checkState(!analyzer.hasUnassignedConjuncts()); |
上面函数中调用的主要私有方法有:
createTableRefNode()、createCheapestJoinPlan()、 createFromClauseJoinPlan()、 createAggregationPlan(),各个函数的具体实现如下:
createTableRefNode()
2 |
* Create a tree of PlanNodes for the given tblRef, which can be a BaseTableRef, |
3 |
* CollectionTableRef or an InlineViewRef. |
5 |
private PlanNode createTableRefNode(Analyzer analyzer, TableRef tblRef) |
6 |
throws ImpalaException { |
7 |
if (tblRef instanceof BaseTableRef || tblRef instanceof CollectionTableRef) { |
9 |
return createScanNode(analyzer, tblRef); |
10 |
} else if (tblRef instanceof InlineViewRef) { |
12 |
return createInlineViewPlan(analyzer, (InlineViewRef) tblRef); |
14 |
throw new InternalException( |
15 |
"Unknown TableRef node: " + tblRef.getClass().getSimpleName()); |
createCheapestJoinPlan()
2 |
* 返回物化 join refPlans 中所有 TblRefs 开销最小的 plan |
3 |
* 假设 refPlans 中的顺序和查询中的原始顺序相同 |
5 |
* - the plan is executable, ie, all non-cross joins have equi-join predicates |
6 |
* - the leftmost scan is over the largest of the inputs for which we can still |
7 |
* construct an executable plan(左边的是最大表) |
8 |
* - all rhs‘s(right hand side?) are in decreasing order of selectiveness (percentage of rows they |
10 |
* - outer/cross/semi joins: rhs serialized size is < lhs serialized size;(右边的表比左边的小) |
11 |
* enforced via join inversion, if necessary(否则通过 join 反转实现) |
12 |
* Returns null if we can‘t create an executable plan. |
14 |
private PlanNode createCheapestJoinPlan( |
15 |
Analyzer analyzer, List<Pair<TableRef, PlanNode>> refPlans) |
16 |
throws ImpalaException { |
17 |
LOG.trace( "createCheapestJoinPlan" ); |
18 |
if (refPlans.size() == 1 ) return refPlans.get( 0 ).second; |
20 |
// collect eligible candidates for the leftmost input; list contains |
21 |
// (plan, materialized size) |
22 |
ArrayList<Pair<TableRef, Long>> candidates = Lists.newArrayList(); |
23 |
for (Pair<TableRef, PlanNode> entry: refPlans) { |
24 |
TableRef ref = entry.first; |
25 |
JoinOperator joinOp = ref.getJoinOp(); |
27 |
// The rhs table of an outer/semi join can appear as the left-most input if we |
28 |
// invert the lhs/rhs and the join op. However, we may only consider this inversion |
29 |
// for the very first join in refPlans, otherwise we could reorder tables/joins |
30 |
// across outer/semi joins which is generally incorrect. The null-aware |
31 |
// left anti-join operator is never considered for inversion because we can‘t |
32 |
// execute the null-aware right anti-join efficiently. |
33 |
// TODO: Allow the rhs of any cross join as the leftmost table. This needs careful |
34 |
// consideration of the joinOps that result from such a re-ordering (IMPALA-1281). |
35 |
if (((joinOp.isOuterJoin() || joinOp.isSemiJoin() || joinOp.isCrossJoin()) && |
36 |
ref != refPlans.get( 1 ).first) || joinOp.isNullAwareLeftAntiJoin()) { |
37 |
// ref cannot appear as the leftmost input |
41 |
PlanNode plan = entry.second; |
42 |
if (plan.getCardinality() == - 1 ) { |
43 |
// use 0 for the size to avoid it becoming the leftmost input |
44 |
// TODO: Consider raw size of scanned partitions in the absence of stats. |
45 |
candidates.add( new Pair(ref, new Long( 0 ))); |
46 |
LOG.trace( "candidate " + ref.getUniqueAlias() + ": 0" ); |
49 |
Preconditions.checkNotNull(ref.getDesc()); |
50 |
long materializedSize = |
51 |
( long ) Math.ceil(plan.getAvgRowSize() * ( double ) plan.getCardinality()); |
52 |
candidates.add( new Pair(ref, new Long(materializedSize))); |
53 |
LOG.trace( "candidate " + ref.getUniqueAlias() + ": " + Long.toString(materializedSize)); |
55 |
if (candidates.isEmpty()) return null ; |
57 |
// order candidates by descending materialized size; we want to minimize the memory |
58 |
// consumption of the materialized hash tables required for the join sequence |
59 |
Collections.sort(candidates, |
60 |
new Comparator<Pair<TableRef, Long>>() { |
61 |
public int compare(Pair<TableRef, Long> a, Pair<TableRef, Long> b) { |
62 |
long diff = b.second - a.second; |
63 |
return (diff < 0 ? - 1 : (diff > 0 ? 1 : 0 )); |
67 |
// 根据已经按照大小排序的 table 创建 JoinPlan |
68 |
for (Pair<TableRef, Long> candidate: candidates) { |
69 |
PlanNode result = createJoinPlan(analyzer, candidate.first, refPlans); |
70 |
if (result != null ) return result; |
createFromClauseJoinPlan()
2 |
* 返回按照 from 语句顺序的 JoinPlan |
4 |
private PlanNode createFromClauseJoinPlan( |
5 |
Analyzer analyzer, List<Pair<TableRef, PlanNode>> refPlans) |
6 |
throws ImpalaException { |
7 |
// create left-deep sequence of binary hash joins; assign node ids as we go along |
8 |
Preconditions.checkState(!refPlans.isEmpty()); |
9 |
PlanNode root = refPlans.get( 0 ).second; |
10 |
for ( int i = 1 ; i < refPlans.size(); ++i) { |
11 |
TableRef innerRef = refPlans.get(i).first; |
12 |
PlanNode innerPlan = refPlans.get(i).second; |
13 |
root = createJoinNode(analyzer, root, innerPlan, null , innerRef); |
14 |
root.setId(ctx_.getNextNodeId()); |
createAggregationPlan()
2 |
* Returns a new AggregationNode that materializes the aggregation of the given stmt. |
3 |
* Assigns conjuncts from the Having clause to the returned node. |
5 |
private PlanNode createAggregationPlan(SelectStmt selectStmt, Analyzer analyzer, |
6 |
PlanNode root) throws InternalException { |
7 |
Preconditions.checkState(selectStmt.getAggInfo() != null ); |
8 |
// add aggregation, if required |
9 |
AggregateInfo aggInfo = selectStmt.getAggInfo(); |
10 |
root = new AggregationNode(ctx_.getNextNodeId(), root, aggInfo); |
12 |
Preconditions.checkState(root.hasValidStats()); |
13 |
// if we‘re computing DISTINCT agg fns, the analyzer already created the |
15 |
if (aggInfo.isDistinctAgg()) { |
16 |
((AggregationNode)root).unsetNeedsFinalize(); |
17 |
// The output of the 1st phase agg is the 1st phase intermediate. |
18 |
((AggregationNode)root).setIntermediateTuple(); |
19 |
root = new AggregationNode(ctx_.getNextNodeId(), root, |
20 |
aggInfo.getSecondPhaseDistinctAggInfo()); |
22 |
Preconditions.checkState(root.hasValidStats()); |
25 |
root.assignConjuncts(analyzer); |
上面的 createCheapestJoinPlan() 和 createFromClauseJoinPlan()
方法调用了 createJoinNode() 和 createJoinPlan() 两个方法。它们的具体实现如下:
createJoinNode()
2 |
* 创建 join outer 和 inner 的 node。两者其中之一可能是一个根据 table ref 创建的 plan |
3 |
* 但不能同时都是 plan。对应的 outer/inner tableRef 不能为空 |
5 |
private PlanNode createJoinNode( |
6 |
Analyzer analyzer, PlanNode outer, PlanNode inner, TableRef outerRef, |
7 |
TableRef innerRef) throws ImpalaException { |
8 |
Preconditions.checkState(innerRef != null ^ outerRef != null ); |
9 |
TableRef tblRef = (innerRef != null ) ? innerRef : outerRef; |
11 |
List<BinaryPredicate> eqJoinConjuncts = Lists.newArrayList(); |
12 |
List<Expr> eqJoinPredicates = Lists.newArrayList(); |
13 |
// get eq join predicates for the TableRefs‘ ids (not the PlanNodes‘ ids, which |
15 |
if (innerRef != null ) { |
16 |
getHashLookupJoinConjuncts( |
17 |
analyzer, outer.getTblRefIds(), innerRef, eqJoinConjuncts, eqJoinPredicates); |
18 |
// Outer joins should only use On-clause predicates as eqJoinConjuncts. |
19 |
if (!innerRef.getJoinOp().isOuterJoin()) { |
20 |
analyzer.createEquivConjuncts(outer.getTblRefIds(), innerRef.getId(), |
24 |
getHashLookupJoinConjuncts( |
25 |
analyzer, inner.getTblRefIds(), outerRef, eqJoinConjuncts, eqJoinPredicates); |
26 |
// Outer joins should only use On-clause predicates as eqJoinConjuncts. |
27 |
if (!outerRef.getJoinOp().isOuterJoin()) { |
28 |
analyzer.createEquivConjuncts(inner.getTblRefIds(), outerRef.getId(), |
31 |
// Reverse the lhs/rhs of the join conjuncts. |
32 |
for (BinaryPredicate eqJoinConjunct: eqJoinConjuncts) { |
33 |
Expr swapTmp = eqJoinConjunct.getChild( 0 ); |
34 |
eqJoinConjunct.setChild( 0 , eqJoinConjunct.getChild( 1 )); |
35 |
eqJoinConjunct.setChild( 1 , swapTmp); |
40 |
if (eqJoinConjuncts.isEmpty()) { |
41 |
// Since our only implementation of semi and outer joins is hash-based, and we do |
42 |
// not re-order semi and outer joins, we must have eqJoinConjuncts here to execute |
44 |
// TODO: Revisit when we add more semi/join implementations. Pick up and pass in |
45 |
// the otherJoinConjuncts. |
46 |
if (tblRef.getJoinOp().isOuterJoin() || |
47 |
tblRef.getJoinOp().isSemiJoin()) { |
48 |
throw new NotImplementedException( |
49 |
String.format( "%s join with ‘%s‘ without equi-join " + |
50 |
"conjuncts is not supported." , |
51 |
tblRef.getJoinOp().isOuterJoin() ? "Outer" : "Semi" , |
52 |
innerRef.getUniqueAlias())); |
54 |
CrossJoinNode result = |
55 |
new CrossJoinNode(outer, inner, tblRef, Collections.<Expr>emptyList()); |
56 |
result.init(analyzer); |
61 |
if (tblRef.getJoinOp() == JoinOperator.CROSS_JOIN) { |
62 |
tblRef.setJoinOp(JoinOperator.INNER_JOIN); |
65 |
analyzer.markConjunctsAssigned(eqJoinPredicates); |
67 |
List<Expr> otherJoinConjuncts = Lists.newArrayList(); |
68 |
if (tblRef.getJoinOp().isOuterJoin()) { // 外连接 |
69 |
// Also assign conjuncts from On clause. All remaining unassigned conjuncts |
70 |
// that can be evaluated by this join are assigned in createSelectPlan(). |
71 |
otherJoinConjuncts = analyzer.getUnassignedOjConjuncts(tblRef); |
72 |
} else if (tblRef.getJoinOp().isSemiJoin()) { // 半连接 |
73 |
// Unassigned conjuncts bound by the invisible tuple id of a semi join must have |
74 |
// come from the join‘s On-clause, and therefore, must be added to the other join |
75 |
// conjuncts to produce correct results. |
77 |
analyzer.getUnassignedConjuncts(tblRef.getAllTupleIds(), false ); |
78 |
if (tblRef.getJoinOp().isNullAwareLeftAntiJoin()) { // 对空值敏感的反连接 |
79 |
boolean hasNullMatchingEqOperator = false ; |
80 |
// Keep only the null-matching eq conjunct in the eqJoinConjuncts and move |
81 |
// all the others in otherJoinConjuncts. The BE relies on this |
82 |
// separation for correct execution of the null-aware left anti join. |
83 |
Iterator<BinaryPredicate> it = eqJoinConjuncts.iterator(); |
84 |
while (it.hasNext()) { |
85 |
BinaryPredicate conjunct = it.next(); |
86 |
if (!conjunct.isNullMatchingEq()) { |
87 |
otherJoinConjuncts.add(conjunct); |
90 |
// Only one null-matching eq conjunct is allowed |
91 |
Preconditions.checkState(!hasNullMatchingEqOperator); |
92 |
hasNullMatchingEqOperator = true ; |
95 |
Preconditions.checkState(hasNullMatchingEqOperator); |
98 |
analyzer.markConjunctsAssigned(otherJoinConjuncts); |
100 |
HashJoinNode result = |
101 |
new HashJoinNode(outer, inner, tblRef, eqJoinConjuncts, otherJoinConjuncts); |
102 |
result.init(analyzer); |
createJoinPlan()
2 |
* Returns a plan with leftmostRef‘s plan as its leftmost input; the joins |
3 |
* are in decreasing order of selectiveness (percentage of rows they eliminate). |
4 |
* The leftmostRef‘s join will be inverted if it is an outer/semi/cross join. |
6 |
private PlanNode createJoinPlan( |
7 |
Analyzer analyzer, TableRef leftmostRef, List<Pair<TableRef, PlanNode>> refPlans) |
8 |
throws ImpalaException { |
10 |
LOG.trace( "createJoinPlan: " + leftmostRef.getUniqueAlias()); |
12 |
List<Pair<TableRef, PlanNode>> remainingRefs = Lists.newArrayList(); |
13 |
PlanNode root = null ; // root of accumulated join plan |
14 |
for (Pair<TableRef, PlanNode> entry: refPlans) { |
15 |
if (entry.first == leftmostRef) { |
18 |
remainingRefs.add(entry); |
21 |
Preconditions.checkNotNull(root); |
22 |
// 已经 join 的 refs;joinedRefs 和 remainingRefs 中 refs 的 union 就是所有 table refs |
23 |
Set<TableRef> joinedRefs = Sets.newHashSet(); |
24 |
joinedRefs.add(leftmostRef); |
26 |
// 如果最左边的 TblRef 是 outer/semi/cross join,反转 |
27 |
boolean planHasInvertedJoin = false ; |
28 |
if (leftmostRef.getJoinOp().isOuterJoin() |
29 |
|| leftmostRef.getJoinOp().isSemiJoin() |
30 |
|| leftmostRef.getJoinOp().isCrossJoin()) { |
31 |
// TODO: Revisit the interaction of join inversion here and the analysis state |
32 |
// that is changed in analyzer.invertOuterJoin(). Changing the analysis state |
33 |
// should not be necessary because the semantics of an inverted outer join do |
35 |
leftmostRef.invertJoin(refPlans, analyzer); |
36 |
planHasInvertedJoin = true ; |
41 |
while (!remainingRefs.isEmpty()) { |
42 |
// Join 链中的每一步都最小化结果数目,从而最小化 hash table 查找 |
43 |
PlanNode newRoot = null ; |
44 |
Pair<TableRef, PlanNode> minEntry = null ; |
45 |
for (Pair<TableRef, PlanNode> entry: remainingRefs) { |
46 |
TableRef ref = entry.first; |
47 |
LOG.trace(Integer.toString(i) + " considering ref " + ref.getUniqueAlias()); |
49 |
// Determine whether we can or must consider this join at this point in the plan. |
50 |
// Place outer/semi joins at a fixed position in the plan tree (IMPALA-860), |
51 |
// s.t. all the tables appearing to the left/right of an outer/semi join in |
52 |
// the original query still remain to the left/right after join ordering. This |
53 |
// prevents join re-ordering across outer/semi joins which is generally wrong. |
54 |
// The checks below relies on remainingRefs being in the order as they originally |
55 |
// appeared in the query. |
56 |
JoinOperator joinOp = ref.getJoinOp(); |
57 |
if (joinOp.isOuterJoin() || joinOp.isSemiJoin()) { |
58 |
List<TupleId> currentTids = Lists.newArrayList(root.getTblRefIds()); |
59 |
currentTids.add(ref.getId()); |
60 |
// Place outer/semi joins at a fixed position in the plan tree. We know that |
61 |
// the join resulting from ‘ref‘ must become the new root if the current |
62 |
// root materializes exactly those tuple ids corresponding to TableRefs |
63 |
// appearing to the left of ‘ref‘ in the original query. |
64 |
List<TupleId> tableRefTupleIds = ref.getAllTupleIds(); |
65 |
if (!currentTids.containsAll(tableRefTupleIds) || |
66 |
!tableRefTupleIds.containsAll(currentTids)) { |
67 |
// Do not consider the remaining table refs to prevent incorrect re-ordering |
68 |
// of tables across outer/semi/anti joins. |
71 |
} else if (ref.getJoinOp().isCrossJoin()) { |
72 |
if (!joinedRefs.contains(ref.getLeftTblRef())) continue ; |
75 |
PlanNode rhsPlan = entry.second; |
76 |
analyzer.setAssignedConjuncts(root.getAssignedConjuncts()); |
78 |
boolean invertJoin = false ; |
79 |
if (joinOp.isOuterJoin() || joinOp.isSemiJoin() || joinOp.isCrossJoin()) { |
80 |
// Invert the join if doing so reduces the size of build-side hash table |
81 |
// (may also reduce network costs depending on the join strategy). |
82 |
// Only consider this optimization if both the lhs/rhs cardinalities are known. |
83 |
// The null-aware left anti-join operator is never considered for inversion |
84 |
// because we can‘t execute the null-aware right anti-join efficiently. |
85 |
long lhsCard = root.getCardinality(); |
86 |
long rhsCard = rhsPlan.getCardinality(); |
87 |
if (lhsCard != - 1 && rhsCard != - 1 && |
88 |
lhsCard * root.getAvgRowSize() < rhsCard * rhsPlan.getAvgRowSize() && |
89 |
!joinOp.isNullAwareLeftAntiJoin()) { |
93 |
PlanNode candidate = null ; |
95 |
ref.setJoinOp(ref.getJoinOp().invert()); |
96 |
candidate = createJoinNode(analyzer, rhsPlan, root, ref, null ); |
97 |
planHasInvertedJoin = true ; |
99 |
candidate = createJoinNode(analyzer, root, rhsPlan, null , ref); |
101 |
if (candidate == null ) continue ; |
102 |
LOG.trace( "cardinality=" + Long.toString(candidate.getCardinality())); |
104 |
// Use ‘candidate‘ as the new root; don‘t consider any other table refs at this |
105 |
// position in the plan. |
106 |
if (joinOp.isOuterJoin() || joinOp.isSemiJoin()) { |
112 |
// 优先选择 Hash Join 而不是 Cross Join, due to limited costing infrastructure |
114 |
|| (candidate.getClass().equals(newRoot.getClass()) |
115 |
&& candidate.getCardinality() < newRoot.getCardinality()) |
116 |
|| (candidate instanceof HashJoinNode && newRoot instanceof CrossJoinNode)) { |
121 |
if (newRoot == null ) { |
122 |
// Currently, it should not be possible to invert a join for a plan that turns |
123 |
// out to be non-executable because (1) the joins we consider for inversion are |
124 |
// barriers in the join order, and (2) the caller of this function only considers |
125 |
// other leftmost table refs if a plan turns out to be non-executable. |
126 |
// TODO: This preconditions check will need to be changed to undo the in-place |
127 |
// modifications made to table refs for join inversion, if the caller decides to |
128 |
// explore more leftmost table refs. |
129 |
Preconditions.checkState(!planHasInvertedJoin); |
133 |
// we need to insert every rhs row into the hash table and then look up |
135 |
long lhsCardinality = root.getCardinality(); |
136 |
long rhsCardinality = minEntry.second.getCardinality(); |
137 |
numOps += lhsCardinality + rhsCardinality; |
138 |
LOG.debug(Integer.toString(i) + " chose " + minEntry.first.getUniqueAlias() |
139 |
+ " #lhs=" + Long.toString(lhsCardinality) |
140 |
+ " #rhs=" + Long.toString(rhsCardinality) |
141 |
+ " #ops=" + Long.toString(numOps)); |
142 |
remainingRefs.remove(minEntry); |
143 |
joinedRefs.add(minEntry.first); |
145 |
// assign id_ after running through the possible choices in order to end up |
146 |
// with a dense sequence of node ids |
147 |
root.setId(ctx_.getNextNodeId()); |
148 |
analyzer.setAssignedConjuncts(root.getAssignedConjuncts()); |
至此我们已经大概介绍了 createSingleNodePlan 的过程。
现在让我们回到 createPlan() 函数,来看看创建分布式执行计划树,即 createPlanFrangments 过程。
DistributedPlanner.createPlanFragments()
(Planner.java)
方法为单点计划树生成多个片段。具体代码如下:
3 |
* 片段通过 list 返回,list 中位置 i 的片段只能使用片段 j 的输出(j > i)。 |
5 |
* TODO: 考虑计划片段中的数据分片; 尤其是要比 createQueryPlan() 更加注重协调 |
6 |
* 聚集操作中 hash partitioning 以及分析计算中的 hash partitioning。 |
7 |
* (只有在相同 select 块中进行聚集和分析计算时才会发生协调) |
9 |
public ArrayList<PlanFragment> createPlanFragments( |
10 |
PlanNode singleNodePlan) throws ImpalaException { |
11 |
Preconditions.checkState(!ctx_.isSingleNodeExec()); |
12 |
AnalysisContext.AnalysisResult analysisResult = ctx_.getAnalysisResult(); |
13 |
QueryStmt queryStmt = ctx_.getQueryStmt(); |
14 |
ArrayList<PlanFragment> fragments = Lists.newArrayList(); |
15 |
// 对于 insert 或 CTAS,除非有 limit 限制才保持根片段 partitioned |
16 |
// 否则,合并所有为一个单独的 coordinator fragment 以便传回到客户端 |
17 |
boolean isPartitioned = false ; |
18 |
if ((analysisResult.isInsertStmt() || analysisResult.isCreateTableAsSelectStmt()) |
19 |
&& !singleNodePlan.hasLimit()) { |
20 |
Preconditions.checkState(!queryStmt.hasOffset()); |
23 |
LOG.debug( "create plan fragments" ); |
24 |
long perNodeMemLimit = ctx_.getQueryOptions().mem_limit; |
25 |
LOG.debug( "memlimit=" + Long.toString(perNodeMemLimit)); |
27 |
createPlanFragments(singleNodePlan, isPartitioned, perNodeMemLimit, fragments); |
上面的方法调用私有成员方法 DistributedPlanner.createPlanFragments()
DistributedPlanner.java
该方法返回生成 root 结果的 fragments。具体代码如下:
2 |
* 返回生成 ‘root‘ 结果的 fragments; 递归创建所有 input fragments 到返回的 fragment |
3 |
* 如果创建了一个新的 fragment,会被追加到 ‘fragments’,这样 fragment 就会在所有需要 |
5 |
* 如果 ‘isPartitioned‘ 为否,,那么返回的 fragment 就是 unpartitioned; |
6 |
* 否则就可能是 partitioned, 取决于它的输入是否 partitioned; |
7 |
* the partition function is derived from the inputs. |
9 |
private PlanFragment createPlanFragments( |
10 |
PlanNode root, boolean isPartitioned, |
11 |
long perNodeMemLimit, ArrayList<PlanFragment> fragments) |
12 |
throws InternalException, NotImplementedException { |
13 |
ArrayList<PlanFragment> childFragments = Lists.newArrayList(); |
14 |
for (PlanNode child: root.getChildren()) { |
15 |
// 允许子 fragments 是 partition 的,除非它们保护 limit 从句。 |
16 |
// (因为包含 limit 限制的结果集需要集中计算); |
18 |
boolean childIsPartitioned = !child.hasLimit(); |
19 |
// 递归调用 createPlanFragments,将 child 创建的 PlanFragments 添加到 childFragments |
22 |
child, childIsPartitioned, perNodeMemLimit, fragments)); |
24 |
// 根据 root 的不同 Node 类型创建不同的 Fragment |
25 |
PlanFragment result = null ; |
26 |
if (root instanceof ScanNode) { |
27 |
result = createScanFragment(root); |
28 |
fragments.add(result); |
29 |
} else if (root instanceof HashJoinNode) { |
30 |
Preconditions.checkState(childFragments.size() == 2 ); |
31 |
result = createHashJoinFragment( |
32 |
(HashJoinNode) root, childFragments.get( 1 ), childFragments.get( 0 ), |
33 |
perNodeMemLimit, fragments); |
34 |
} else if (root instanceof CrossJoinNode) { |
35 |
Preconditions.checkState(childFragments.size() == 2 ); |
36 |
result = createCrossJoinFragment( |
37 |
(CrossJoinNode) root, childFragments.get( 1 ), childFragments.get( 0 ), |
38 |
perNodeMemLimit, fragments); |
39 |
} else if (root instanceof SelectNode) { |
40 |
result = createSelectNodeFragment((SelectNode) root, childFragments); |
41 |
} else if (root instanceof UnionNode) { |
42 |
result = createUnionNodeFragment((UnionNode) root, childFragments, fragments); |
43 |
} else if (root instanceof AggregationNode) { |
44 |
result = createAggregationFragment( |
45 |
(AggregationNode) root, childFragments.get( 0 ), fragments); |
46 |
} else if (root instanceof SortNode) { |
47 |
if (((SortNode) root).isAnalyticSort()) { |
48 |
// don‘t parallelize this like a regular SortNode |
49 |
result = createAnalyticFragment( |
50 |
(SortNode) root, childFragments.get( 0 ), fragments); |
52 |
result = createOrderByFragment( |
53 |
(SortNode) root, childFragments.get( 0 ), fragments); |
55 |
} else if (root instanceof AnalyticEvalNode) { |
56 |
result = createAnalyticFragment(root, childFragments.get( 0 ), fragments); |
57 |
} else if (root instanceof EmptySetNode) { |
58 |
result = new PlanFragment( |
59 |
ctx_.getNextFragmentId(), root, DataPartition.UNPARTITIONED); |
61 |
throw new InternalException( |
62 |
"Cannot create plan fragment for this node type: " + root.getExplainString()); |
64 |
// move ‘result‘ to end, it depends on all of its children |
65 |
fragments.remove(result); |
66 |
fragments.add(result); |
67 |
// 如果已经分区,还需要创建 MergeFragment |
68 |
if (!isPartitioned && result.isPartitioned()) { |
69 |
result = createMergeFragment(result); |
70 |
fragments.add(result); |
上面的方法调用了大量的 create*Fragment() 私有成员方法。这些成员方法的具体实现可以查看源文件:
DistributedPlanner.java
这些成员方法都返回了 PlanFragment 实例,关于该类的具体实现可以查看源代码:
PlanFragment.java
至此,我们大概介绍了 createPlanFragments 的过程。
由于 createSingleNodePlan 和 createPlanFragments 两个 createPlan 最重要的部分都已经介绍了,
createPlan 也就介绍到这里。现在让我们回到 frontend.createExecRequest()
继续来看剩下的内容。frontend.createExecRequest() 其余代码如下:
2 |
* Create a populated TExecRequest corresponding to the supplied TQueryCtx. |
4 |
public TExecRequest createExecRequest(TQueryCtx queryCtx, StringBuilder explainString) |
5 |
throws ImpalaException { |
13 |
for ( int i = 1 ; i < fragments.size(); ++i) { |
14 |
PlanFragment dest = fragments.get(i).getDestFragment(); |
15 |
Integer idx = fragmentIdx.get(dest); |
16 |
Preconditions.checkState(idx != null ); |
17 |
queryExecRequest.addToDest_fragment_idx(idx.intValue()); |
20 |
// 为 Scan node 设置 scan 范围/位置 |
21 |
// Also assemble list of tables names missing stats for assembling a warning message. |
22 |
LOG.debug( "get scan range locations" ); |
23 |
Set<TTableName> tablesMissingStats = Sets.newTreeSet(); |
24 |
for (ScanNode scanNode: scanNodes) { |
25 |
queryExecRequest.putToPer_node_scan_ranges( |
26 |
scanNode.getId().asInt(), |
27 |
scanNode.getScanRangeLocations()); |
28 |
if (scanNode.isTableMissingStats()) { |
29 |
tablesMissingStats.add(scanNode.getTupleDesc().getTableName().toThrift()); |
33 |
queryExecRequest.setHost_list(analysisResult.getAnalyzer().getHostIndex().getList()); |
34 |
for (TTableName tableName: tablesMissingStats) { |
35 |
queryCtx.addToTables_missing_stats(tableName); |
38 |
// Optionally disable spilling in the backend. Allow spilling if there are plan hints |
39 |
// or if all tables have stats. |
40 |
if (queryCtx.request.query_options.isDisable_unsafe_spills() |
41 |
&& !tablesMissingStats.isEmpty() |
42 |
&& !analysisResult.getAnalyzer().hasPlanHints()) { |
43 |
queryCtx.setDisable_spilling( true ); |
46 |
// 计算资源需求,因为 scan node 的开销估计取决于这些 |
48 |
planner.computeResourceReqs(fragments, true , queryExecRequest); |
49 |
} catch (Exception e) { |
51 |
LOG.error( "Failed to compute resource requirements for query\n" + |
52 |
queryCtx.request.getStmt(), e); |
55 |
// 到了这里 fragment 所有信息都设置好了,序列化到 Thrift |
56 |
for (PlanFragment fragment: fragments) { |
57 |
TPlanFragment thriftFragment = fragment.toThrift(); |
58 |
queryExecRequest.addToFragments(thriftFragment); |
61 |
// Use VERBOSE by default for all non-explain statements. |
62 |
TExplainLevel explainLevel = TExplainLevel.VERBOSE; |
63 |
// Use the query option for explain stmts and tests (e.g., planner tests). |
64 |
if (analysisResult.isExplainStmt() || RuntimeEnv.INSTANCE.isTestEnv()) { |
65 |
explainLevel = queryCtx.request.query_options.getExplain_level(); |
68 |
// Global query parameters to be set in each TPlanExecRequest. |
69 |
queryExecRequest.setQuery_ctx(queryCtx); |
72 |
planner.getExplainString(fragments, queryExecRequest, explainLevel)); |
73 |
queryExecRequest.setQuery_plan(explainString.toString()); |
74 |
queryExecRequest.setDesc_tbl(analysisResult.getAnalyzer().getDescTbl().toThrift()); |
76 |
String jsonLineageGraph = analysisResult.getJsonLineageGraph(); |
77 |
if (jsonLineageGraph != null && !jsonLineageGraph.isEmpty()) { |
78 |
queryExecRequest.setLineage_graph(jsonLineageGraph); |
81 |
if (analysisResult.isExplainStmt()) { |
82 |
// Return the EXPLAIN request |
83 |
createExplainRequest(explainString.toString(), result); |
87 |
result.setQuery_exec_request(queryExecRequest); |
89 |
if (analysisResult.isQueryStmt()) { |
91 |
LOG.debug( "create result set metadata" ); |
92 |
result.stmt_type = TStmtType.QUERY; |
93 |
result.query_exec_request.stmt_type = result.stmt_type; |
94 |
TResultSetMetadata metadata = new TResultSetMetadata(); |
95 |
QueryStmt queryStmt = analysisResult.getQueryStmt(); |
96 |
int colCnt = queryStmt.getColLabels().size(); |
97 |
for ( int i = 0 ; i < colCnt; ++i) { |
98 |
TColumn colDesc = new TColumn(); |
99 |
colDesc.columnName = queryStmt.getColLabels().get(i); |
100 |
colDesc.columnType = queryStmt.getResultExprs().get(i).getType().toThrift(); |
101 |
metadata.addToColumns(colDesc); |
103 |
result.setResult_set_metadata(metadata); |
105 |
Preconditions.checkState(analysisResult.isInsertStmt() || |
106 |
analysisResult.isCreateTableAsSelectStmt()); |
108 |
// For CTAS the overall TExecRequest statement type is DDL, but the |
109 |
// query_exec_request should be DML |
111 |
analysisResult.isCreateTableAsSelectStmt() ? TStmtType.DDL : TStmtType.DML; |
112 |
result.query_exec_request.stmt_type = TStmtType.DML; |
114 |
// create finalization params of insert stmt |
115 |
InsertStmt insertStmt = analysisResult.getInsertStmt(); |
116 |
if (insertStmt.getTargetTable() instanceof HdfsTable) { |
117 |
TFinalizeParams finalizeParams = new TFinalizeParams(); |
118 |
finalizeParams.setIs_overwrite(insertStmt.isOverwrite()); |
119 |
finalizeParams.setTable_name(insertStmt.getTargetTableName().getTbl()); |
120 |
finalizeParams.setTable_id(insertStmt.getTargetTable().getId().asInt()); |
121 |
String db = insertStmt.getTargetTableName().getDb(); |
122 |
finalizeParams.setTable_db(db == null ? queryCtx.session.database : db); |
123 |
HdfsTable hdfsTable = (HdfsTable) insertStmt.getTargetTable(); |
124 |
finalizeParams.setHdfs_base_dir(hdfsTable.getHdfsBaseDir()); |
125 |
finalizeParams.setStaging_dir( |
126 |
hdfsTable.getHdfsBaseDir() + "/_impala_insert_staging" ); |
127 |
queryExecRequest.setFinalize_params(finalizeParams); |
131 |
validateTableIds(analysisResult.getAnalyzer(), result); |
133 |
timeline.markEvent( "Planning finished" ); |
134 |
result.setTimeline(analysisResult.getAnalyzer().getTimeline().toThrift()); |
至此,FE 结束,返回 TExecRequest 型的对象给 backend 执行。
由于笔者刚开始接触 Impala,分析可能存在某些谬误,有任何疑问或建议都欢迎讨论。
Impala 源码分析-FE
时间: 2024-12-22 01:36:08