背景
[作者: DeepLearningStack, 阿里巴巴算法工程师, 开源 TensorFlow Contributor]
在 TensorFlow 源码中我们经常能看到一个奇怪的词 --Rendezvous. 如果从仔细统计该单词出现的频率和模块, 你会发现无论在单机还是分布式, 无论在 core 目录还是 contrib 目录都存在它的身影, 所涉及的模块非常多. Rendezvous 是一个法语单词, 发音也比较特殊, 一般直译为 "约会, 相会, 会和", 而在 TensorFlow 中, Rendezvous 是用来完成消息传输的通信组件. 大部分源码读者在起初阅读时通信部分的代码时可能会觉得有点懵圈, 为什么不使用 Communicator 这样简单明了的单词来表明通信过程, 反而使用这样一个晦涩的法语词作为抽象呢? 其实在了解 TensorFlow 消息通信的原理后就会发现, 使用 Rendezvous 作为这一过程的抽象是非常贴切的.
因为 Rendezvous 所涉及的模块组件较多, 为了让读者循序渐进地理解 TensorFlow 中的通信机制, 决定将 Rendezvous 分成多个系列, 由浅入深分开梳理. 这样做的目的不但能让读者阅读时对整体层次结构有较好的把握, 而且简短的篇幅也便于阅读, 所以建议读者按顺序阅读本系列. 本文是 TensorFlow 通信机制系列的第一篇文章, 侧重整体结构和本地传输通信的梳理.
消息传输的唯一标识符 --ParsedKey
在 TensorFlow 中无论是单机还是分布式都涉及到消息传输, 并且消息传输总是从发送端 Send, 接收端 Recv. 那么这里就存在一个消息的对应问题: 在多组消息同时发送接收时, 需要对每一对 Send 和 Recv 梳理一个对应关系, 即 Send 端发送的消息与 Recv 端接收的消息不能有错位. 如果 Recv 端本打算接收的消息是 A, 但由于消息对应错误导致接收到了 B, 那么整个训练过程就会出现错误. 其实解决这个问题也非常简单, 因为每一对 Send 和 Recv 所处理消息都是同一个, 所以只要让某个消息在被 Send 前加上一个唯一标识符, 而 Recv 在接收消息前也能够按照某种规则拼出一样的唯一标识符, 这个对应关系就完美解决了. 在 TensorFlow 中确实定义了这样一种标识符, 它就是结构体 ParsedKey.
ParsedKey 结构体
在 tensorflow/core/framework/rendezvous.h 的 Rendezvous 类内定义了结构体 ParsedKey, 它内容非常简短却又十分全面, 不但包含了消息传输的所有必须的内容, 还具备唯一性, 在我们直接分析其源代码结构.
- // Parses the key constructed by CreateKey and parse src/dst device
- // names into structures respectively.
- struct ParsedKey {
- StringPiece src_device;
- DeviceNameUtils::ParsedName src;
- uint64 src_incarnation = 0;
- StringPiece dst_device;
- DeviceNameUtils::ParsedName dst;
- StringPiece edge_name;
- ParsedKey() {}
- ParsedKey(const ParsedKey& b) { *this = b; }
- ParsedKey& operator=(const ParsedKey& b);
- StringPiece FullKey() const { return buf_; }
- private:
- friend class Rendezvous;
- friend class SendOp;
- friend class RecvOp;
- string buf_;
- };
可以看到其结构非常简单, 一个完备的 ParsedKey 要包括六个部分.
src_device: 消息发送源的字符串信息, 形如 / job:localhost/replica:0/task_id:0/device:GPU:0
src: 和 src_device 的信息量相同, 只不过是结构体的表示方法
src_incarnation: 一般来说这个字段没有什么作用, 但是当某个 worker 重启后, 该值会发生变化, 用来和之前挂掉的 worker 做区分, 这便于 debug
dst_device: 消息发送的接收方字符串信息, 格式和 src_device 相同
dst: 和 dst_device 的信息量相同, 只不过是结构体的表示方法
edge_name: 这个字段是该 Key 最特殊的地方, 它可以灵活指定为任何字符串, 实现不同 Key 的区分. 比如它可以是 Tensor 的名字, 也可以是具有某种特殊意义的固定字符串
CreateKey 过程与 ParseKey 过程
一般情况下, 在 TensorFlow 中应该优先使用 CreateKey 函数来构造可以解析的 Key 字符串, 然后经过 ParseKey 过程将该字符串的每个信息解析到 ParsedKey 结构体中, 之所以使用 CreateKey 函数构造 Key 字符串是因为这是最安全保险的方式, 下面是 CreateKey 函数构造 Key 字符串的过程展现.
CreateKey 只要接受五个参数即可安全构造字符串形式的 Key, 这里面特殊之处有两个, a. 参数中 frame_and_iter 一般直接取自 OpKernelContext 中的 FrameAndIter 对象; b. src_incarnation 要做一个十六进制的字符串转换. CreateKey 函数的输出是以分号 (";") 为分隔符的字符串, 该字符串同样包含五个域. CreateKey 是一个 static 函数, 代码比较简单, 就不在这里列出. 随后我们这个字符串传入 ParseKey 函数即可完成结构体 ParsedKey 的解析, 解析过程如下.
ParseKey 对输入字符串的前四个域做了映射, 抛弃了第五个域, 但是在提供 Key 字符串时需要提供完整的五个域, 否则会检查报错. 和 CreateKey 相同, ParseKey 过程也是一个 static 函数, 代码如下所示.
- /* static */
- Status Rendezvous::ParseKey(StringPiece key, ParsedKey* out) {
- if (key.data() == out->buf_.data()) {
- // Caller used our buf_ string directly, so we don't need to copy. (The
- // SendOp and RecvOp implementations do this, for example).
- DCHECK_EQ(key.size(), out->buf_.size());
- } else {
- // Make a copy that our StringPieces can point at a copy that will persist
- // for the lifetime of the ParsedKey object.
- out->buf_.assign(key.data(), key.size());
- }
- StringPiece s(out->buf_);
- StringPiece parts[5];
- for (int i = 0; i <5; i++) {
- parts[i] = ConsumeNextPart(&s, ';');
- }
- if (s.empty() && // Consumed the whole string
- !parts[4].empty() && // Exactly five parts
- DeviceNameUtils::ParseFullName(parts[0], &out->src) &&
- strings::HexStringToUint64(parts[1], &out->src_incarnation) &&
- DeviceNameUtils::ParseFullName(parts[2], &out->dst) &&
- !parts[3].empty()) {
- out->src_device = StringPiece(parts[0].data(), parts[0].size());
- out->dst_device = StringPiece(parts[2].data(), parts[2].size());
- out->edge_name = StringPiece(parts[3].data(), parts[3].size());
- return Status::OK();
- }
- return errors::InvalidArgument("Invalid rendezvous key:", key);
- }
- Rendezvous
在了解 ParsedKey 之后, 我们就可以窥探 Rendezvous 这个类的内部结构和实现了. 最基本的 Rendezvous 类被定义在了 tensorflow/core/framework/rendezvous.h 文件中, 它对外提供了最基本的 Send,Recv 和 RecvAsync 接口和实现. 总体来说这个类还是比较抽象的, 在不同的通信场景下需要提供不同的实现. 比如对于本地传输来说, TensorFlow 提供了 LocalRendezvous 和 IntraProcessRendezvous 实现类, 对于使用跨进程通信场景来说, TensorFlow 提供了 RemouteRendezvous 实现系列. 不同通信场景的实现细节差别相当大, 所以本系列将对这些做逐个梳理, 本文只关注本地传输部分. 如果对跨进程传输感兴趣, 那么请关注该系列的下一篇文章. Rendezvous 类中最重要的函数是 Send 和 Recv 系列, 它们的签名和注释如下代码所示.
- // The caller is a tensor producer and it sends a message (a tensor
- // "val" and a bool "is_dead") under the given "key".
- //
- // {val, is_dead} is bundled as a message sent and received.
- // Typically, is_dead is set by some control flow nodes
- // (e.g., a not-taken branch). args is passed by Send to the
- // Recv function to communicate any information that the Recv
- // function might need. This is typically only necessary for
- // Send/Recv on the same worker.
- //
- // Send() never blocks.
- virtual Status Send(const ParsedKey& key, const Args& args, const Tensor& val, const bool is_dead) = 0;
- virtual void RecvAsync(const ParsedKey& key, const Args& args, DoneCallback done) = 0;
- // Synchronous wrapper for RecvAsync.
- Status Recv(const ParsedKey& key, const Args& args, Tensor* val, bool* is_dead, int64 timeout_ms);
- Status Recv(const ParsedKey& key, const Args& args, Tensor* val, bool* is_dead);
TensorFlow 中的 Recv 有两种, 一种是同步版本, 换一种是异步版本. 通常情况下为了计算和通信的 overlap,TensorFlow 广泛使用了 RecvAsync 函数. 并且在后面一节中我们可以知道, Send 过程并不是真的参与数据通信, 所有的通信过程均由 RecvAsync 完成.
Rendezvous 相关类结构
在了解通信过程之前, 应该先熟悉下 Rendezvous 相关的类结构. 下面的类图展示了当期 TensorFlow 系统中所有的 Rendezvous 相关类图结构.
所有的 Rendezvous 相关类都以 Rendezvous 基类为核心, LocalRendezvous 和 IntraProcessRendezvous 是我们本文分析的重点, SimpleRendezvous 实现非常简单, 读者可以在熟悉前两个实现之后自行分析该类. 而 BaseRemoteRendezvous 类以及相关类是跨进程通信相关的组件, 这部分内容将在下一篇文章中分析.
Rendezvous 基类中的 Recv 函数
因为 Recv 函数只是 RecvAsync 函数的同步版本封装, 因此在每个实现类继承重新函数时, 只需要提供 Send 函数的实现和 RecvAsync 函数实现即可, 下面的代码是 Rendezvous 基类中同步版本实现.
- Status Rendezvous::Recv(const ParsedKey& key, const Args& recv_args,
- Tensor* val, bool* is_dead, int64 timeout_ms) {
- Status ret;
- Notification n;
- RecvAsync(key, recv_args,
- [&ret, &n, val, is_dead](const Status& s, const Args& send_args,
- const Args& recv_args, const Tensor& v,
- const bool dead) {
- ret = s;
- *val = v;
- *is_dead = dead;
- n.Notify();
- });
- if (timeout_ms> 0) {
- int64 timeout_us = timeout_ms * 1000;
- bool notified = WaitForNotificationWithTimeout(&n, timeout_us);
- if (!notified) {
- return Status(error::DEADLINE_EXCEEDED,
- "Timed out waiting for notification");
- }
- } else {
- n.WaitForNotification();
- }
- return ret;
- }
可以看出, 无论 RecvAsync 的实现内容是什么, Recv 函数都可以将 RecvAsync 视为黑盒, 在其上层封装成为与 RecvAsync 相同实现的同步函数版本.
本地传输过程
使用本地传输过程包括 LocalRendezous 和 IntraProcessRendezvous 两个实现类, 但是后者是前者的封装, 因此本文分析的重点在于 LocalRendezvous 实现类.
消息队列的缓存 --Table
在 TensorFlow 中, 几乎每个 Rendezvous 实现类都有自己的消息队列缓存, 而几乎每种消息队列缓存都是依靠 Table 实现的. Rendezvous 的发送 (Send) 和接收 (Recv) 都将通过 Table 完成, 这完美地阐释了 "约会, 相会, 会和" 的释义, 这也是为什么 TensorFlow 使用这样一个法语词来抽象通信过程. 下图形象化的表示了 Table 以及 Table 中的每个 Item.
在 LocalRendezvous 实现类中, Send 端和 Recv 端使用的是同一个 Rendezvous 对象, 所以他们共享同一个 Table, 所以 Table 属于临界资源, 应该加锁形成互斥访问. Item 这个结构中其实有很多内容, 在上图中只解释两个比较重要的部分.
Value: 这就是参与通信 Tensor 本体
Waitor: 这是在确认 Tensor 被接收端完成接收后的处理函数, 也就是 consumer 处理该 Tensor 的函数过程
传输过程分析
无论是 Send 过程还是 Recv 过程, 它们都将借助 Table 完成 Tensor 的转发. Send 过程作为 Tensor 的生产者, 它负责将待发送的 Tensor 送入 Table 中, 并将 ParsedKey 作为该 Item 的键. 而 Recv 过程作为消费者, 它也会根据自己所需拼出相同的 ParsedKey, 然后从 Table 中查看是否已经存在该项.
应该注意的是, Tensor 虽然由 Send 端生产, 但是 Table 中的 Item 却不一定是由 Send 端插入. 因为在 TensorFlow 中, Send 和 RecvAsync 二者的相对顺序是不能保证先后的, 经常出现需求比供给在时间片上先到的情况, 那么这时就会出现 RecvAsync 先拼出了 ParsedKey 然后立即查表的情况. 应对这种情况的一种方案是, RecvAsync 放弃此次查询, 开启另一个线程轮询该表直到 Send 端产生为止, 然后执行 consumer 的 waiter 函数, 但这是一个非常消耗资源的实现方式. TensorFlow 为了保证异步性, 使用另一种无需 CPU 轮询消耗资源的实现方式.
我们知道, 在 Send 和 RecvAsync 顺序相对异步的情况下, waitor 函数的执行时机只有两种情况, 它取决于 Send 的供给和 RecvAsync 的需求哪一个先到达. 若生产者先到达, 那么 waiter 函数的调用由 RecvAsync 执行. 若消费者的需求先到达, 那么 waiter 函数的调用由 Send 执行. 简而言之, 总是迟到的一方执行 waiter 函数. 那么可以这样设计: 和 Send 端相同, 允许 RecvAsync 将所需的 Item 插入到 Table 中, 并连同 waiter 函数一起发送到该表里. 如果 Send 端后到达, 那么 Send 函数将从表中取出该 Item, 并执行 waiter 函数, 反之, 则由 RecvAsync 函数取出自己所需要的 Item, 然后执行 waiter 函数, 下面的图展示了这个过程.
Send 过程源码
了解上述的过程后, 我们可以直接看 Send 函数的源码了. 下面是 LocalRendezvous 的 Send 函数源码展示.
- Status Send(const ParsedKey& key, const Args& send_args, const Tensor& val,
- const bool is_dead) override {
- uint64 key_hash = KeyHash(key.FullKey());
- VLOG(2) <<"Send" << this << "" << key_hash <<" " << key.FullKey();
- mu_.lock();
- if (!status_.ok()) {
- // Rendezvous has been aborted.
- Status s = status_;
- mu_.unlock();
- return s;
- }
- ItemQueue* queue = &table_[key_hash];
- if (queue->empty() || queue->front()->IsSendValue()) {
- // There is no waiter for this message. Append the message
- // into the queue. The waiter will pick it up when arrives.
- // Only send-related fields need to be filled.
- Item* item = new Item;
- item->value = val;
- item->is_dead = is_dead;
- item->send_args = send_args;
- if (item->send_args.device_context) {
- item->send_args.device_context->Ref();
- }
- queue->push_back(item);
- mu_.unlock();
- return Status::OK();
- }
- // There is an earliest waiter to consume this message.
- Item* item = queue->front();
- queue->pop_front();
- mu_.unlock();
- // Notify the waiter by invoking its done closure, outside the
- // lock.
- DCHECK(!item->IsSendValue());
- item->waiter(Status::OK(), send_args, item->recv_args, val, is_dead);
- delete item;
- return Status::OK();
- }
RecvAsync 过程源码
下面是 LocalRendezvous 的 RecvAsync 函数源码展示.
- void RecvAsync(const ParsedKey& key, const Args& recv_args,
- DoneCallback done) override {
- uint64 key_hash = KeyHash(key.FullKey());
- VLOG(2) <<"Recv" << this << "" << key_hash <<" " << key.FullKey();
- mu_.lock();
- if (!status_.ok()) {
- // Rendezvous has been aborted.
- Status s = status_;
- mu_.unlock();
- done(s, Args(), recv_args, Tensor(), false);
- return;
- }
- ItemQueue* queue = &table_[key_hash];
- if (queue->empty() || !queue->front()->IsSendValue()) {
- // There is no message to pick up.
- // Only recv-related fields need to be filled.
- Item* item = new Item;
- item->waiter = std::move(done);
- item->recv_args = recv_args;
- if (item->recv_args.device_context) {
- item->recv_args.device_context->Ref();
- }
- queue->push_back(item);
- mu_.unlock();
- return;
- }
- // A message has already arrived and is queued in the table under
- // this key. Consumes the message and invokes the done closure.
- Item* item = queue->front();
- queue->pop_front();
- mu_.unlock();
- // Invokes the done() by invoking its done closure, outside scope
- // of the table lock.
- DCHECK(item->IsSendValue());
- done(Status::OK(), item->send_args, recv_args, item->value, item->is_dead);
- delete item;
- }
关于 IntraProcessRendezvous 的 Send 和 RecvAsync 函数
其实本质上 IntraProcessRendezvous 和 LocalRendezvous 是同一个函数实现, 只是前者对后者做了一层封装. 我们从源码中看到, LocalRendezvous 是 IntraProcessRendezvous 的成员之一, 只是在回调函数中多了一些简单的处理而已, 比如它会仔细考量 Tensor 的生产方和消费方是存在于 CPU 还是 GPU, 是否可以通过 P2P 直接拷贝, 还是需要通过 Host 做中转, 关于拷贝过程使用的是下面的函数, 其他地方大同小异, 因此不再赘述. 有兴趣的读者可以到 tensorflow/core/common_runtime / 目录下参考 rendezvous_mgr.h,rendezvous_mgr.cc 和 copy_tensor.h 与 copy_tensor.cc 这几个文件.
- // Copies "input" to "output" between devices accessible to the
- // local process via some DMA-like method. "edge_name" is the name
- // of the tensor being copied, for debugging purposes. Depending on
- // the type of devices and memory in use, the copy may be performed
- // synchronously or asynchronously. 'done' will be invoked only
- // after the copy is actually complete.
- static void ViaDMA(StringPiece edge_name, DeviceContext* send_dev_context,
- DeviceContext* recv_dev_context, Device* src, Device* dst,
- const AllocatorAttributes src_alloc_attr,
- const AllocatorAttributes dst_alloc_attr,
- const Tensor* input, Tensor* output,
- int dev_to_dev_stream_index, StatusCallback done);
总结
本文是 TensorFlow 通信机制系列的第一篇文章, 先通过抛出高并发情况下消息通信两端的对应问题引出 TensorFlow 中的 ParsedKey 结构设计的必要性, 然后给出了 Rendezvous 全局类图, 最后详细的分析了 LocalRendezvous 的消息传输实现过程. TensorFlow 的通信机制的完美的阐释了 Rendezvous 一词的含义 -- 无论是 Send 端还是 Recv 端都需要在临界资源 Table 中 "约会", 进行消息的传输. 随后还着重分析了异步情况下, 本属于 consumer 的 waiter 函数调用时机设计问题 -- 为了保证 waiter 函数的执行不被阻塞, 从设计上采取 Late invoke 的方案. IntraProcessRendezous 本质是 LocalRendezvous 的一层封装, 它在数据拷贝上面做了更多的工作, 借助 LocalRendezvous 实现了 Send 和 Recv 处于不同或相同种类 Device 情况下, 对上层完全透明的拷贝过程. 由于篇幅原因, 特意将 TensorFlow 通信机制分为多个系列分析, 作为第一篇文章, 本篇介绍了 Rendezvous 的基本框架. 在该系列之后的文章中, 还会对跨进程的通信进行详细地分析.
来源: https://www.cnblogs.com/deep-learning-stacks/p/10354258.html