这里我们从 BeeLine.execute 讲起.
接下来来到 BeeLine.dispatch, 这里的入参就是 sql 语句. 方法的最后调用了 Commands.sql, 然后调用到了 Commands.execute.
如下图所示, 这里是 Commands.execute 中的关键逻辑.
1. 调用 BeeLine.createStatement, 该方法间接调用了 HiveConnection.createStatement. 在后面的方法中构建了 HiveStatement.
2. 调用了 Commands.createLogRunnable, 在该方法中循环调用 HiveStatement.hasMoreLogs. 并且将方法 HiveStatement.getQueryLog 中获取到的数据使用 BeeLine.info 输出到控制台. 该方法封装为 Runnable 然后返回, 后面使用线程封装并调用.
3. 调用 HiveStatement.execute, 该方法首先调用 closeClientOperation 与 initFlags 重置了部分成员变量, 以便下一次调用.
4. 然后调用 Client.ExecuteStatement, 获取返回的 operationHandle, 并为下一次调用做准备.
5. 遍历调用 Client.GetOperationStatus, 直到获取的状态为 CLOSED_STATE 或 FINISHED_STATE, 此时就可以将变量 operationComplete 置为 true, 跳出循环.
6. 构造结果集 HiveQueryResultSet 并返回. 在构造期间我们需要注意这里会调用到 HiveQueryResultSet.retrieveSchema. 这里间接调用了 Client.GetResultSetMetadata.
7. 后面调用了 showRemainingLogsIfAny, 该方法间接调用到了 HiveStatement.getQueryLog, 该方法内部就调用了 Client.FetchResults.
这里有一个令人疑惑的地方 -- 构造了 logThread 线程后, 调用其 start 方法, 然后调用 HiveStatement.execute, 接着又调用了线程的 interrupt 与 join 方法. 这里更多的是在服务端需要跑 MR 任务时获取相关的任务状态.
1. 首先调用 logThread.start, 此时是为了前期对于任务的监听. 在 logThread 中循环的布尔条件, 也就是 HiveStatement.isLogBeingGenerated 此时一直为 true, 也就是该线程一直在运行.
2. 然后调用了 HiveStatement.execute, 该方法在执行完 Client.ExecuteStatement 后调用 Client.GetOperationStatus 来获取语句的执行情况, 如果语句的执行状态变更为 FINISHED_STATE, 则更新上面的变量值 HiveStatement.isLogBeingGenerated 为 false.
3. 调用 logThread.interrupt, 也就是说, 将 logThread 的中断值置为 true. 如果 logThread 此时处于 sleep 状态, 那么会抛出 InterruptedException 异常, 执行 showRemainingLogsIfAny 方法继续调用方法 hiveStatement.getQueryLog 来获取 job 状态信息. 如果 logThread 不处于休眠状态, 则会等到其下一次休眠时 (如果有的话), 同样会来到方法 showRemainingLogsIfAny, 继续查询 job 的状态.
4. 调用了 logThread.join, 其入参值为 10s, 也就是说, 如果 10s 后 logThread 仍然运行, 那么会停止监控.
在 BufferedRows 的构造方法中就已经将所有的数据取回. 这里主要调用了两个方法 --HiveQueryResultSet.next 与 Row 的构造方法.
1. 如下图所示, 这里是 HiveQueryResultSet.next 方法中比较重要的逻辑部分. 这里首先调用 Client.FetchResults 从服务端获取结果集, 然后通过方法 RowSetFactory.create 构造了 ColumnBasedSet(由于我们这里的版本号是 HIVE_CLI_SERVICE_PROTOCOL_V7, 因此构造的类是 ColumnBasedSet, 并且在该构造方法中完成了将返回的数据集封装到类型内成员变量的流程). 另外 ColumnBasedSet 覆写了接口 Iterable 中的 iterator 方法. 用于后面迭代将数据取出.
这里的 next 方法中的条件判断保证了只要有数据取回, 便会一直返回 true, 只有从服务端取回的数据为空时, 这里的返回结果才为 false.
2.Row 的构造方法, 如下图所示:
下图框选出该构造方法中的重点方法 --HiveBaseResultSet.getString. 这里会将上面方法中对成员变量 row 赋予的值取出为一行.
这里简单总结一下, 客户端调用 TCLIService.thrift 协议中的完成接口流程:
- OpenSession
- FetchResults
- CloseOperation
- ExecuteStatement
- GetOperationStatus
- GetResultSetMetadata
- FetchResults
- CloseOperation
- CloseSession
如果连接已经建立后, 这里只会调用上面中的 2-8 流程.
来源: https://www.cnblogs.com/letsfly/p/10579964.html