ps-lite_part4_customer讲解

2,886次阅读
没有评论

共计 3784 个字符,预计需要花费 10 分钟才能阅读完成。

距离上一次发文又间隔了一段时间,都有点忘记写到哪一部分了,先回顾一下之前写到哪里了

  1. 介绍ps-lite的基本概念 https://www.deeplearn.me/4302.html
  2. 介绍ps-lite核心组成 postOffice https://www.deeplearn.me/4303.html
  3. 介绍ps-lite 通信模块van https://www.deeplearn.me/4306.html

本篇文章主要讲解 customer。customer的角色主要干的事情其实是一个中介的事情,它主要负责server 和worker之间的一些信息传递。

我们先看看 customer 在哪里被创建使用?

在test/test_kv_app_multi_workers.cc文件中我们看下如何调用的,先看下server端调用

void StartServer() {
  if (!IsServer()) return;
  //重点是kvserver的初始化,传递的参数是app_id
  auto server = new KVServer<float>(0);
  server->set_request_handle(KVServerDefaultHandle<float>());
  RegisterExitCallback([server](){ delete server; });
}

对于一个server来说app_id可以理解为是kv数据库的id,这个在之前有提到过,kvsever的初始化函数如下所示:

 explicit KVServer(int app_id) : SimpleApp() {
    using namespace std::placeholders;
    obj_ = new Customer(app_id, app_id, std::bind(&KVServer<Val>::Process, this, _1));
  }

从上面就可以清晰的看到customer被创建的过程,直接调用customer初始化方法。这里有一点需要注意的是customer这个初始化参数第一个和第二个参数传递的都是一个值,如果是这样的话是不是意味着server端就一个customer?

这个其实是对比worker端而言,下面放一下woker端customer初始化对比

  explicit KVWorker(int app_id, int customer_id) : SimpleApp() {
    using namespace std::placeholders;
    slicer_ = std::bind(&KVWorker<Val>::DefaultSlicer, this, _1, _2, _3);
    obj_ = new Customer(app_id, customer_id, std::bind(&KVWorker<Val>::Process, this, _1));
  }

这么一对比就发现差异蛮大的,kvworker的初始化需要指定传递app_id和customer_id的,那么意味着app-id和customer_id是一对多的关系。

以上只是看一下customer在哪了被调用,以及在worker端和server端之间的差异。

Customer

构造函数

接下来就是看下customer的真面貌来!!!还是先看下customer的构造函数:

Customer::Customer(int app_id, int customer_id, const Customer::RecvHandle& recv_handle)
    : app_id_(app_id), customer_id_(customer_id), recv_handle_(recv_handle) {
  Postoffice::Get()->AddCustomer(this);
  recv_thread_ = std::unique_ptr<std::thread>(new std::thread(&Customer::Receiving, this));
}

首先看下传参,app_id和customer_id就不多解释了,const Customer::RecvHandle& recv_handle 这个参数的含义是表示接受到消息之后要对消息做什么处理,一般情况下这个是由kvserver和kvworker初始化的时候传递给customer的,也就是说这些处理逻辑本身跟customer没啥关系,由实际需求方自己定义。

举个最简单的例子,server端拿到worker端梯度信息要去更新参数,这个处理逻辑就是 const Customer::RecvHandle& recv_handle 这个要干的事情。

构造函数做了哪些事情?

  1. 调用PostOffice::AddCustomer将当前Customer注册到PostOffice;
    1. PostOffice的customers成员: 在对应的app_id的元素上添加custom_id;
    2. PostOffice的barrier_done成员将该custom_id的同步状态设为false
  2. 新起一个接收消息的线程,不断的从外界接收消息并且根据实际的处理逻辑做出相应的处理。

消息处理

生产和消费模式

Customer的消息处理遵循“producer+consumer”模式,之前也提到customer只是个中介,那么它这个生产和消费是如何进行的?

生产

核心的API是Accept函数

 inline void Accept(const Message& recved) {
    recv_queue_.Push(recved);
  }

recv_queue_就是一个队列,存储调用房发送过来的消息,对于customer而言上游发送过来的消息就是生产出来的内容,customer拿到这个也只是存起来。

消费

核心函数就是这个 Receiving 成员函数,负责消息的consumer

void Customer::Receiving() {
  while (true) {
    Message recv;
    //从队列中取出相应的数据
    recv_queue_.WaitAndPop(&recv);
    //判断当前的消息control指令是不是结束的指令,如果是则直接结束
    if (!recv.meta.control.empty() &&
        recv.meta.control.cmd == Control::TERMINATE) {
      break;
    }
    //调用的制定的处理函数来消费内容
    recv_handle_(recv);
    //判断当前的消息是请求和响应,如果是响应则修改tracker_数据
    //tracker是负责核对消息完整性,比如你发送3个请求,那么收到3个响应才算完整,那么这个参数就是干这个事
    if (!recv.meta.request) {
      std::lock_guard<std::mutex> lk(tracker_mu_);
      tracker_[recv.meta.timestamp].second++;
      tracker_cond_.notify_all();
    }
  }
}

请求相关

比如kvworker需要去server拉取参数,那么就会经过customer来进行的消息请求,由customer新建请求去server拉取数据。

int Customer::NewRequest(int recver) {
  std::lock_guard<std::mutex> lk(tracker_mu_);
  //这里是获取需要通知的nodeid下的节点个数,比如worker向server请求,那么这就是所有需要参与处理server的节点数
  int num = Postoffice::Get()->GetNodeIDs(recver).size();
  //构建一个tracker_ 用于校对请求的完整星,这个在Receiving函数里也有体现
  tracker_.push_back(std::make_pair(num, 0));
  return tracker_.size() - 1;
}

上面也提到一个worker会从多个server拉取数据,那么什么时候才能保证数据都获取到呢,那么有的处理响应快,有的慢,怎么保证数据的完整性,那么下面的这个函数就是做这个事情

void Customer::WaitRequest(int timestamp) {
  std::unique_lock<std::mutex> lk(tracker_mu_);
  tracker_cond_.wait(lk, [this, timestamp]{
      return tracker_[timestamp].first == tracker_[timestamp].second;
    });
}

就是用tracker_cond_阻塞等待“请求节点数”和“回复节点数”相等。而tracker_cond_是在Customer::Receiving每次接受到消息时通知一下。

注意这个阻塞是“单机阻塞”,适用于比如,worker必须在从server拉取完最新参数之后,才能开始下一轮的训练。由于pull是异步的,所以worker需要调用WaitRequest阻塞等待那个pull request完成。多机之间的阻塞同步,要用PostOffice:Barrier

总结

主要讲解了customer的作用–中介,同时作为中介,它是如何进行消息的生产与消费,最后也提到了消息的同步这块的知识。

正文完
请博主喝杯咖啡吧!
post-qrcode
 
admin
版权声明:本站原创文章,由 admin 2023-03-22发表,共计3784字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
评论(没有评论)
验证码