Post Views = 5
之前鼓捣 Hbase 的时候,觉得单机和伪分布式模式太 low 了,就在笔记本上用三个虚拟机搭建了一个 "完全分布式" 的 Hbase 环境 (心疼破本子一秒钟)。刚好趁这个元旦假期,我就研究了一下 Hadoop。Hadoop 也算是个巨无霸了,涉及了很多方面的功能。个人工作中有多个 RPC client 管理以及交互的场景,一直觉得设计的不太好。所以心里一直想研究一下优秀项目的多路 RPC 是如何实现的,然后计划一直搁置到现在。难得小假期,就拿手上的 Hadoop 开刀吧!这个 connections 就是 Client 的成员变量,代表着 Client 所建立的所有连接。此外 Client 还有一个叫 Call 内部类。Call 代表一次 RCP 调用,虽然 Hadoop 的 RPC 是直接基于 TCP 的,但上层使用起来和 REST 之类的 RPC 还是非常相似的。Call 的代码片段如下:
- private ConcurrentMap < ConnectionId,
- Connection > connections = new ConcurrentHashMap < >();
id 表示这次 RPC 的调用的编号,因为这里的 TCP RPC 是全双工的,所以需要一个序列标识。为了保证 Call 的 id 在单个连接中唯一,Client 定义了几个 AtomicInteger 变量。每个 RPC Call 都会把这个 id 带上,call 的 response 里面也会带上这个 id,这样客户端可以分发消息了。一个 Client 主要的数据结构如下图所示:
- /**
- * Class that represents an RPC call
- */
- static class Call {
- final int id; // call id
- final int retry; // retry count
- final Writable rpcRequest; // the serialized rpc request
- Writable rpcResponse; // null if rpc has error
- IOException error; // exception, null if success
- final RPC.RpcKind rpcKind; // Rpc EngineKind
- boolean done; // true when call is done
- private final Object externalHandler;
- private Call(RPC.RpcKind rpcKind, Writable param) {
- this.rpcKind = rpcKind;
- this.rpcRequest = param;
- final Integer id = callId.get();
- if (id == null) {
- this.id = nextCallId();
- } else {
- callId.set(null);
- this.id = id;
- }
- final Integer rc = retryCount.get();
- if (rc == null) {
- this.retry = 0;
- } else {
- this.retry = rc;
- }
- this.externalHandler = EXTERNAL_CALL_HANDLER.get();
- }
- }
其实这个逻辑结构显得挺简单的,主要的工作还是在 Connection 类中完成的。Connection 作为一个 Thread 的子类,它的 run() 方法其实就是不断的 read,然后根据 Response 中的 Call id 分发返回消息。在具体实现中, Connection 的 run 方法就是在 while 循环中不断 receiveRpcResponse()。
- private void receiveRpcResponse() {
- if (shouldCloseConnection.get()) {
- return;
- }
- touch();
- try {
- ByteBuffer bb = ipcStreams.readResponse();
- RpcWritable.Buffer packet = RpcWritable.Buffer.wrap(bb);
- RpcResponseHeaderProto header =
- packet.getValue(RpcResponseHeaderProto.getDefaultInstance());
- checkResponse(header);
- int callId = header.getCallId();
- if (LOG.isDebugEnabled())
- LOG.debug(getName() + " got value #" + callId);
- RpcStatusProto status = header.getStatus();
- if (status == RpcStatusProto.SUCCESS) {
- Writable value = packet.newInstance(valueClass, conf);
- final Call call = calls.remove(callId);
- call.setRpcResponse(value);
- }
- // verify that packet length was correct
- if (packet.remaining() > 0) {
- throw new RpcClientException("RPC response length mismatch");
- }
- if (status != RpcStatusProto.SUCCESS) { // Rpc Request failed
- final String exceptionClassName = header.hasExceptionClassName() ?
- header.getExceptionClassName() :
- "ServerDidNotSetExceptionClassName";
- final String errorMsg = header.hasErrorMsg() ?
- header.getErrorMsg() : "ServerDidNotSetErrorMsg" ;
- final RpcErrorCodeProto erCode =
- (header.hasErrorDetail() ? header.getErrorDetail() : null);
- if (erCode == null) {
- LOG.warn("Detailed error code not set by server on rpc error");
- }
- RemoteException re = new RemoteException(exceptionClassName, errorMsg, erCode);
- if (status == RpcStatusProto.ERROR) {
- final Call call = calls.remove(callId);
- call.setException(re);
- } else if (status == RpcStatusProto.FATAL) {
- // Close the connection
- markClosed(re);
- }
- }
- } catch (IOException e) {
- markClosed(e);
- }
- }
看了接收逻辑,那么发送 RPC call 的逻辑也必不可少。有一点值得注意的是,发送 RPC call 都不是 connection 线程,所以这里需要一些线程同步方法。一般来说,会使用消息队列的方式来缓存 call,然后一个发送线程不断发送 call。不过 Hadoop 不是这样做的,它使用的是一个线程池,然后传输给线程池的是一个包装发送 Call 的 Runnable。为什么采用这种完全 task base 的方法,我也没太明白。不过话说回来,也没有明显的缺点,反而是把消息队列的工作扔给线程池了,减少了一定工作量。 这里简单的贴一点代码:
- public void sendRpcRequest(final Call call)
- throws InterruptedException, IOException {
- if (shouldCloseConnection.get()) {
- return;
- }
- // Serialize the call to be sent. This is done from the actual
- // caller thread, rather than the sendParamsExecutor thread,
- RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader(
- call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id, call.retry,
- clientId);
- final ResponseBuffer buf = new ResponseBuffer();
- header.writeDelimitedTo(buf);
- RpcWritable.wrap(call.rpcRequest).writeTo(buf);
- synchronized (sendRpcRequestLock) {
- Future<?> senderFuture = sendParamsExecutor.submit(new Runnable() {
- @Override
- public void run() {
- try {
- synchronized (ipcStreams.out) {
- if (shouldCloseConnection.get()) {
- return;
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug(getName() + " sending #" + call.id);
- }
- // RpcRequestHeader + RpcRequest
- ipcStreams.sendRequest(buf.toByteArray());
- ipcStreams.flush();
- }
- } catch (IOException e) {
- // exception at this point would leave the connection in an
- // unrecoverable state (eg half a call left on the wire).
- // So, close the connection, killing any outstanding calls
- markClosed(e);
- } finally {
- //the buffer is just an in-memory buffer, but it is still polite to
- // close early
- IOUtils.closeStream(buf);
- }
- }
- });
- }
- }
从代码片段中可以看出,一个 Server 类会存在一个 Listener 线程,一个 Responder 线程以及多个 Handler 线程。其中 Listener 线程是一个使用 NIO 的线程,接收所有的连接请求都是由 Listener 线程处理的。其实 Listener 线程内部还有多个 Reader 线程,Reader 线程的功能是处理 Accept 之后的连接,构造出 RpcCall 请求,然后扔到 CallQueueManager<Call> callQueue 这个队列中。然后 Handler 线程们从 callQueue 中取出 Call 并执行具体的 RPC。Handler 处理完之后,会以 NIO channel 的方式发送给 Responder,Responder 再实际发送给 Client 端。由于 Hadoop 实际的通信协议有很多种,这里也就不探讨 RpcInvoker 的具体逻辑了。主要就是通过反射调用对应的 call 方法实现,也不是很难理解。OK!这篇博客就到这了。再过几个小时就是 2018 年了,真的是时光如梭啊!未来的路通向哪里,我不知道,但我会加快脚步追寻光明。2018,你好!
- private CallQueueManager < Call > callQueue;
- // maintains the set of client connections and handles idle timeouts
- private Listener listener = null;
- private Responder responder = null;
- private Handler[] handlers = null;
来源: https://juejin.im/entry/5a48ea1051882525636302f7