RpcEndpoint
文档对 RpcEndpoint 的解释:
An end point for the RPC that defines what functions to trigger given a message. It is guaranteed that onStart, receive and onStop will be called in sequence. The life-cycle of an endpoint is: constructor -> onStart -> receive* -> onStop Note: receive can be called concurrently. If you want receive to be thread-safe, please use ThreadSafeRpcEndpoint If any error is thrown from one of RpcEndpoint methods except onError, onError will be invoked with the cause. If onError throws an error, RpcEnv will ignore it.
其子类继承关系如下:
其下面还有一个抽象子接口: ThreadSafeRpcEndpoint
文档对 ThreadSafeRpcEndpoint 的解释如下:
需要 RpcEnv 线程安全地向其发送消息的 trait. 线程安全意味着在通过相同的 ThreadSafeRpcEndpoint 处理一条消息完成后再处理下一个消息. 换句话说, 在处理下一条消息时, 可以看到对 ThreadSafeRpcEndpoint 的内部字段的更改, 并且 ThreadSafeRpcEndpoint 中的字段不需要是 volatile 或等效的. 但是, 不能保证同一个线程将为不同的消息执行相同的 ThreadSafeRpcEndpoint.
即顺序处理消息, 不能同时并发处理. traint RpcEndpoint 的方法如下:
对其变量和方法解释如下:
1. rpcEnv:RpcEndpoint 注册的那个 RpcEnv 对象
2. self : RpcEndpoint 对应的 RpcEndpointRef.onStart 方法被调用的时候, RpcEndpointRef 有效, onStop 调用后, self 会是 null, 注意由于在 onStart 之前, RpcEndpoint 还没有被注册, 还没有有效的 RpcEndpointRef, 所以不要在 onStart 之前调用 self 方法
3. receive : 处理从 RpcEndpointRef.send 或 RpcCallContext.reply 过来的消息, 如果接收到一个未匹配的消息, 会抛出 SparkException 并且发送给 onError 方法
4. receiveAndReply: 处理从 RpcEndpointRef.ask 发过来的消息, 如果接收到一个未匹配的消息, 会抛出 SparkException 并且发送给 onError 方法
5. onError: 在消息处理过程中, 如果有异常都会调用此方法
6. onConnected: 当 remoteAddress 连接上当前节点时被调用
7. onDisconnected: 当当前节点丢失掉 remoteAddress 后被调用
8. onNetworkError: 当连接当前节点和 remoteAddress 时, 有网络错误发生时被调用
9. onStart: 在 RpcEndpoint 开始处理其他消息之前被调用
10. onStop: 当 RpcEndpoint 停止时被调用, self 将会是 null, 不能用于发送消息
11. stop: 停止 RpcEndpoint
RpcEndPointRef
RpcEndPointRef: 远程的 RpcEndpoint 引用, RpcEndpointRef 是线程安全的.
有一个跟 RpcEndPoint 很像的类 -- RpcEndPointRef. 先来看 RpcEndpointRef 抽象类. 下面我们重点来看一下它内部构造.
首先看它的继承结构:
它的父类是 RpcEndpointRef. 先来剖析它的内部变量和方法的解释:
有三个成员变量:
1. maxRetries: 最大尝试连接次数. 可以通过 spark.rpc.numRetries 参数来指定, 默认是 3 次. 该变量暂时没有使用.
2. retryWaitMs: 每次尝试连接最大等待毫秒值. 可以通过 spark.rpc.retry.wait 参数来指定, 默认是 3s. 该变量暂时没有使用.
3. defaultAskTimeout: spark 默认 ask 请求操作超时时间. 可以通过 spark.rpc.askTimeout 或 spark.network.timeout 参数来指定, 默认是 120s.
成员方法:
1. address : 抽象方法, 返回 RpcEndpointRef 的 RpcAddress
2. name: 抽象方法, 返回 endpoint 的 name
3. send: 抽象方法, Sends a one-way asynchronous message. Fire-and-forget semantics. 发送单向的异步消息, 满足 即发即忘 语义.
4. ask: 抽象方法. 发送消息到相应的 RpcEndpoint.receiveAndReply , 并返回 Future 以在默认超时内接收返回值. 它有两个重载方法: 其中没有 RpcTimeOut 的 ask 方法添加一个 defaultAskTimeout 参数继续调用 有 RpcTimeOut 的 ask 方法.
5. askSync: 调用抽象方法 ask. 跟 ask 类似, 有两个重载方法: 其中没有 RpcTimeOut 的 askSync 方法添加一个 defaultAskTimeout 参数继续调用 有 RpcTimeOut 的 askSync 方法. 有 RpcTimeOut 的 askSync 方法 会调用 ask 方法生成一个 Future 对象, 然后等待任务执行完毕后返回.
注意, 这里面其实就涉及到了模板方法模式. ask 跟 askSync 都是设定好了, ask 要返回一个 Future 对象, askSync 则是 调用 ask 返回的 Future 对象, 然后等待 future 的 result 方法返回.
下面看 RpcEndpointRef 的唯一实现类 - NettyRpcEndpointRef
RpcEndpointRef 的 NettyRpcEnv 版本. 此类的行为取决于它的创建位置. 在 "拥有"RpcEndpoint 的节点上, 它是 RpcEndpointAddress 实例的简单包装器. 在接收序列化版本引用的其他计算机上, 行为会发生变化. 实例将跟踪发送引用的 TransportClient, 以便通过客户端连接发送到端点的消息, 而不需要打开新连接. 此 ref 的 RpcAddress 可以为 null; 这意味着 ref 只能通过客户端连接使用, 因为托管端点的进程不会侦听传入连接. 不应与第三方共享这些引用, 因为它们将无法向端点发送消息.
先来看 成员变量:
1. conf : 是一个 SparkConf 实例
2. endpointAddress: 是一个 RpcEndpointAddress 实例, 主要包含了 RpcAddress (host 和 port) 和 rpc endpoint name 的信息
3. nettyEnv: 是一个 NettyRpcEnv 实例
4. client: 是一个 TransportClient 实例, 这个 client 是不参与序列化的.
成员方法:
1. 实现并重写了继承自超类的 ask 方法, 如下:
2. 实现并重写了继承自超类的 send 方法, 如下:
3. 关于序列化和反序列化的两个方法: writeObject(序列化方法) 和 readObject(反序列化方法), 如下:
RequestMessage
顺便, 我们来看 RequestMessage 对象, 代码如下:
RequestMessage 里面的消息是 sender 发给 receiver 的, RequestMessage 主要负责 sender RpcAddress, receiver RpcAddress,receiver rpcendpoint name 以及 消息 content 的序列化.
总结: 本文主要剖析了 RpcEndpoint 和 RpcEntpointRef 两个类, 顺便, 也介绍了支持序列化的 RequestMessage 类.
注: 到目前为止, Spark RPC 组件还没有全部剖析完毕, 预计还有三到四篇文章才能完全剖析完, be patient ??
来源: http://www.bubuko.com/infodetail-3113659.html