ps-lite_part3_Van讲解

3,980次阅读
没有评论

共计 17000 个字符,预计需要花费 43 分钟才能阅读完成。

学习还是要继续,在上一篇文章讲解PostOffice当中其实也提到了Van,但是那一篇文章主要还是在讲PostOffice相关的信息,所以Van 暂时搁置放到这里。还是延续之前的学习的思路,按照ps服务启动的顺序,到那个模块加载的时候,在重点介绍这块的信息。一句题外话,其实在写这些东西的时候,也是会看很多大佬们的讲解,单靠自己去看源码,很多时候也看不懂,也许c++代码看懂了,但是为什么这么设计?所以写的有些不足或者错误也欢迎email我。

ok,那我们先回顾一下Van是在哪里被调用的,是在Postoffice的启动过程中。InitEnvironment 负责创建,van_->Start(customer_id) 负责启动。

void Postoffice::InitEnvironment() {
  const char* val = NULL;
  std::string van_type = GetEnv("DMLC_PS_VAN_TYPE", "zmq");
  van_ = Van::Create(van_type);
  val = CHECK_NOTNULL(Environment::Get()->find("DMLC_NUM_WORKER"));
  num_workers_ = atoi(val);
  val =  CHECK_NOTNULL(Environment::Get()->find("DMLC_NUM_SERVER"));
  num_servers_ = atoi(val);
  val = CHECK_NOTNULL(Environment::Get()->find("DMLC_ROLE"));
  std::string role(val);
  is_worker_ = role == "worker";
  is_server_ = role == "server";
  is_scheduler_ = role == "scheduler";
  verbose_ = GetEnv("PS_VERBOSE", 0);
}

void Postoffice::Start(int customer_id, const char* argv0, const bool do_barrier) {
  start_mu_.lock();
  if (init_stage_ == 0) {
    InitEnvironment();
    // init glog
    if (argv0) {
      dmlc::InitLogging(argv0);
    } else {
      dmlc::InitLogging("ps-lite\0");
    }

    // init node info.
    for (int i = 0; i < num_workers_; ++i) {
      int id = WorkerRankToID(i);
      for (int g : {id, kWorkerGroup, kWorkerGroup + kServerGroup,
                    kWorkerGroup + kScheduler,
                    kWorkerGroup + kServerGroup + kScheduler}) {
        node_ids_[g].push_back(id);
      }
    }

    for (int i = 0; i < num_servers_; ++i) {
      int id = ServerRankToID(i);
      for (int g : {id, kServerGroup, kWorkerGroup + kServerGroup,
                    kServerGroup + kScheduler,
                    kWorkerGroup + kServerGroup + kScheduler}) {
        node_ids_[g].push_back(id);
      }
    }

    for (int g : {kScheduler, kScheduler + kServerGroup + kWorkerGroup,
                  kScheduler + kWorkerGroup, kScheduler + kServerGroup}) {
      node_ids_[g].push_back(kScheduler);
    }
    init_stage_++;
  }
  start_mu_.unlock();

  // start van
  van_->Start(customer_id);

  start_mu_.lock();
  if (init_stage_ == 1) {
    // record start time
    start_time_ = time(NULL);
    init_stage_++;
  }
  start_mu_.unlock();
  // do a barrier here
  if (do_barrier) Barrier(customer_id, kWorkerGroup + kServerGroup + kScheduler);
}

Van 的创建

我们先看看Van对象的创建

Van* Van::Create(const std::string& type) {
  if (type == "zmq") {
    return new ZMQVan();
  } else if (type == "p3") {
    return new P3Van();
#ifdef DMLC_USE_IBVERBS
} else if (type == "ibverbs") {
    return new IBVerbsVan();
#endif
  } else {
    LOG(FATAL) << "Unsupported van type: " << type;
    return nullptr;
  }
}

根据初始化传递的type参数,我们也可以判断出它是一个通信模块,zmq是一个开源的socket编程库,ZMQVan也是基于zmq的Van库,至于下面的p3和ibverbs是额外的两种方式,ibverbs是字节提交的库,有兴趣的可以研究下,对Van本身对理解影响不大。

前面也说了,程序启动分三次,分别是scheduler、server和worker的启动,这里还是先以scheduler这个角色启动为代表讲,跟之前一样,在实际的讲解过程中也会拓展到其他角色。

Van 的启动

先看下代码

void Van::Start(int customer_id) {
  // get scheduler info
  start_mu_.lock();

  if (init_stage == 0) {
    scheduler_.hostname = std::string(
        CHECK_NOTNULL(Environment::Get()->find("DMLC_PS_ROOT_URI")));
    scheduler_.port =
        atoi(CHECK_NOTNULL(Environment::Get()->find("DMLC_PS_ROOT_PORT")));
    scheduler_.role = Node::SCHEDULER;
    scheduler_.id = kScheduler;
    is_scheduler_ = Postoffice::Get()->is_scheduler();

    // get my node info
    if (is_scheduler_) {
      my_node_ = scheduler_;
    } else {
      auto role = Postoffice::Get()->is_worker() ? Node::WORKER : Node::SERVER;
      const char* nhost = Environment::Get()->find("DMLC_NODE_HOST");
      std::string ip;
      if (nhost) ip = std::string(nhost);
      if (ip.empty()) {
        const char* itf = Environment::Get()->find("DMLC_INTERFACE");
        std::string interface;
        if (itf) interface = std::string(itf);
        if (interface.size()) {
          GetIP(interface, &ip);
        } else {
          GetAvailableInterfaceAndIP(&interface, &ip);
        }
        CHECK(!interface.empty()) << "failed to get the interface";
      }
      int port = GetAvailablePort();
      const char* pstr = Environment::Get()->find("PORT");
      if (pstr) port = atoi(pstr);
      CHECK(!ip.empty()) << "failed to get ip";
      CHECK(port) << "failed to get a port";
      my_node_.hostname = ip;
      my_node_.role = role;
      my_node_.port = port;
      // cannot determine my id now, the scheduler will assign it later
      // set it explicitly to make re-register within a same process possible
      my_node_.id = Node::kEmpty;
      my_node_.customer_id = customer_id;
    }

    // bind.
    my_node_.port = Bind(my_node_, is_scheduler_ ? 0 : 40);
    PS_VLOG(1) << "Bind to " << my_node_.DebugString();
    CHECK_NE(my_node_.port, -1) << "bind failed";

    // connect to the scheduler
    Connect(scheduler_);

    // for debug use
    if (Environment::Get()->find("PS_DROP_MSG")) {
      drop_rate_ = atoi(Environment::Get()->find("PS_DROP_MSG"));
    }
    // start receiver
    receiver_thread_ =
        std::unique_ptr<std::thread>(new std::thread(&Van::Receiving, this));
    init_stage++;
  }
  start_mu_.unlock();

  if (!is_scheduler_) {
    // let the scheduler know myself
    Message msg;
    Node customer_specific_node = my_node_;
    customer_specific_node.customer_id = customer_id;
    msg.meta.recver = kScheduler;
    msg.meta.control.cmd = Control::ADD_NODE;
    msg.meta.control.node.push_back(customer_specific_node);
    msg.meta.timestamp = timestamp_++;
    Send(msg);
  }

  // wait until ready
  while (!ready_.load()) {
    std::this_thread::sleep_for(std::chrono::milliseconds(100));
  }

  start_mu_.lock();
  if (init_stage == 1) {
    // resender
    if (Environment::Get()->find("PS_RESEND") &&
        atoi(Environment::Get()->find("PS_RESEND")) != 0) {
      int timeout = 1000;
      if (Environment::Get()->find("PS_RESEND_TIMEOUT")) {
        timeout = atoi(Environment::Get()->find("PS_RESEND_TIMEOUT"));
      }
      resender_ = new Resender(timeout, 10, this);
    }

    if (!is_scheduler_) {
      // start heartbeat thread
      heartbeat_thread_ =
          std::unique_ptr<std::thread>(new std::thread(&Van::Heartbeat, this));
    }
    init_stage++;
  }
  start_mu_.unlock();
}

代码首先在获取scheduler的基础信息

//获取host    
scheduler_.hostname = std::string(
        CHECK_NOTNULL(Environment::Get()->find("DMLC_PS_ROOT_URI")));
//获取端口
    scheduler_.port =
        atoi(CHECK_NOTNULL(Environment::Get()->find("DMLC_PS_ROOT_PORT")));
//赋值 SCHEDULER 角色名 
    scheduler_.role = Node::SCHEDULER;
// kScheduler=1 ,之前讲解nodeid的时候提到过 1,2,4分别代表相应的group
    scheduler_.id = kScheduler;
// 判断当前的节点是否是 SCHEDULER 节点,这个也好理解,我们不同角色启动程序,只有在 SCHEDULER 角色启动这个才是 true
    is_scheduler_ = Postoffice::Get()->is_scheduler();

Van的成员里面有个my_node是记录自己的node节点信息,接下来就是要当前的节点信息写进去

// get my node info
//如果是SCHEDULER 那么 my_node直接赋值
    if (is_scheduler_) {
      my_node_ = scheduler_;
    } else {
      //判断在worker或者server下的角色操作,主要也是获取ip、host和端口等信息
      auto role = Postoffice::Get()->is_worker() ? Node::WORKER : Node::SERVER;
      const char* nhost = Environment::Get()->find("DMLC_NODE_HOST");
      std::string ip;
      if (nhost) ip = std::string(nhost);
      if (ip.empty()) {
        const char* itf = Environment::Get()->find("DMLC_INTERFACE");
        std::string interface;
        if (itf) interface = std::string(itf);
        if (interface.size()) {
          GetIP(interface, &ip);
        } else {
          GetAvailableInterfaceAndIP(&interface, &ip);
        }
        CHECK(!interface.empty()) << "failed to get the interface";
      }
      int port = GetAvailablePort();
      const char* pstr = Environment::Get()->find("PORT");
      if (pstr) port = atoi(pstr);
      CHECK(!ip.empty()) << "failed to get ip";
      CHECK(port) << "failed to get a port";
      my_node_.hostname = ip;
      my_node_.role = role;
      my_node_.port = port;
      // cannot determine my id now, the scheduler will assign it later
      // set it explicitly to make re-register within a same process possible
      my_node_.id = Node::kEmpty;
      my_node_.customer_id = customer_id;
    }

获取基础的node信息之后就是绑定端口,这是为了后续通信服务使用

    // bind.
    my_node_.port = Bind(my_node_, is_scheduler_ ? 0 : 40);
    PS_VLOG(1) << "Bind to " << my_node_.DebugString();
    CHECK_NE(my_node_.port, -1) << "bind failed";

接下来就是比较关键的点了,需要跟scheduler连接上

// connect to the scheduler
    Connect(scheduler_);

这一点乍一看有点问题,按道理worker和server说连接scheduler还好理解,你说scheduler本身也要连接自己?

在connect函数中

  // worker doesn't need to connect to the other workers. same for server
    if ((node.role == my_node_.role) && (node.id != my_node_.id)) {
      return;
    }

那么按照这段逻辑,在scheduler角色启动的时候,应该也是直接return,没有做任何的操作。跟这个源码的诸事应该保持一致。

至于其他的worker和server在启动的时候都要连接到 scheduler。这里也仅仅只是连接而已,只能说明在指定的端口,不同的node之间建立一个通道,实际上还没有进行任何有用的数据传输。

接下来就是启动一个接收消息的线程,负责消息的接收

// start receiver,启动一个消息接收的线程
    receiver_thread_ =
        std::unique_ptr<std::thread>(new std::thread(&Van::Receiving, this));

真正干点事还是在这里

  if (!is_scheduler_) {
    // let the scheduler know myself
    Message msg;
    Node customer_specific_node = my_node_;
    customer_specific_node.customer_id = customer_id;
    msg.meta.recver = kScheduler;
    msg.meta.control.cmd = Control::ADD_NODE;
    msg.meta.control.node.push_back(customer_specific_node);
    msg.meta.timestamp = timestamp_++;
    Send(msg);
  }

当当前的启动不是以 scheduler启动的时候,需要向 scheduler 注册信息,比如worker 节点1 启动了,那么需要让 scheduler 知道又个代号 1 的worker 节点即将上线。那么之前在启动scheduler 的那个消息接收的线程接收到这个请求之后,开始处理这个请求。

现在我们看看这个接收消息的线程是如何处理消息的?

void Van::Receiving() {
  Meta nodes;
  Meta recovery_nodes;  // store recovery nodes
  recovery_nodes.control.cmd = Control::ADD_NODE;

  while (true) {
    Message msg;
    int recv_bytes = RecvMsg(&msg);
    // For debug, drop received message
    if (ready_.load() && drop_rate_ > 0) {
      unsigned seed = time(NULL) + my_node_.id;
      if (rand_r(&seed) % 100 < drop_rate_) {
        LOG(WARNING) << "Drop message " << msg.DebugString();
        continue;
      }
    }

    CHECK_NE(recv_bytes, -1);
    recv_bytes_ += recv_bytes;
    if (Postoffice::Get()->verbose() >= 2) {
      PS_VLOG(2) << msg.DebugString();
    }
    // duplicated message
    if (resender_ && resender_->AddIncomming(msg)) continue;

    if (!msg.meta.control.empty()) {
      // control msg
      auto& ctrl = msg.meta.control;
      if (ctrl.cmd == Control::TERMINATE) {
        ProcessTerminateCommand();
        break;
      } else if (ctrl.cmd == Control::ADD_NODE) {
        ProcessAddNodeCommand(&msg, &nodes, &recovery_nodes);
      } else if (ctrl.cmd == Control::BARRIER) {
        ProcessBarrierCommand(&msg);
      } else if (ctrl.cmd == Control::HEARTBEAT) {
        ProcessHearbeat(&msg);
      } else {
        LOG(WARNING) << "Drop unknown typed message " << msg.DebugString();
      }
    } else {
      ProcessDataMsg(&msg);
    }
  }
}

在上面的Receiving函数中重点是那个while循环,在循环外定义了

Meta nodes;
Meta recovery_nodes;  // store recovery nodes 这个是指之前有问题重新恢复使用的节点注册信息

在循环的刚开始那有些debug代码可以不用管

if (resender_ && resender_->AddIncomming(msg)) continue;

resender是一个重试机制,假设你之前的请求失败,失败的原因有很多比如超时,如果你设置了resender,那么就会重试

接下来就是要根据接受到的消息内容执行相应的任务

停止任务 ProcessTerminateCommand

void Van::ProcessTerminateCommand() {
  PS_VLOG(1) << my_node().ShortDebugString() << " is stopped";
  ready_ = false;
}

这个ready_标志位是表示当前这个节点的可用性,如果置为false那么就表示当前这个节点不可用。

新增节点 ProcessAddNodeCommand

void Van::ProcessAddNodeCommand(Message* msg, Meta* nodes,
                                Meta* recovery_nodes) {
  // 获取之前记录的dead node
  auto dead_nodes = Postoffice::Get()->GetDeadNodes(heartbeat_timeout_);
  std::unordered_set<int> dead_set(dead_nodes.begin(), dead_nodes.end());
  auto& ctrl = msg->meta.control;

  UpdateLocalID(msg, &dead_set, nodes, recovery_nodes);

  if (is_scheduler_) {
    ProcessAddNodeCommandAtScheduler(msg, nodes, recovery_nodes);
  } else {
    for (const auto& node : ctrl.node) {
      std::string addr_str = node.hostname + ":" + std::to_string(node.port);
      if (connected_nodes_.find(addr_str) == connected_nodes_.end()) {
        Connect(node);
        connected_nodes_[addr_str] = node.id;
      }
      if (!node.is_recovery && node.role == Node::SERVER) ++num_servers_;
      if (!node.is_recovery && node.role == Node::WORKER) ++num_workers_;
    }
    PS_VLOG(1) << my_node_.ShortDebugString() << " is connected to others";
    ready_ = true;
  }
}

UpdateLocalID 的函数作用如下

此函数作用是更新节点内部的node id 信息,也是分为两种情况,函数逻辑如下:

  • 如果msg->meta.sender是Meta::kEmpty,即未设定,则处理此message的一定是Scheduler,会进入 if 分支。
    • 如果目前 nodes 的control.node数目小于 “配置的server数目 + 配置的worker数目”,则说明是系统启动阶段,将当前消息的node信息加入到 control.node 之中。
    • 否则说明是系统运行阶段,应该是有些节点死掉重启后再次连接。那么,就从 nodes 的control.node 之中找到一个已经死掉的且节点role 与当前消息一致(同类型)的 node id,把这个 node id 赋给这个重启的节点。并且更新 nodes->control.node 和 recovery_nodes。
  • 下面就是普通节点处理的逻辑:
    • 即在 scheduler 传回来的所有节点信息中查找,目的是找到与自己的ip,port一致的节点。
    • 如果找到,就更新本地节点信息(因为在本节点启动时候,并没有设置 node_id 这个信息,这个需要scheduler统一设置,从注释看,目的是为了使重新注册成为可能)。包括全局 rank 信息。
void Van::UpdateLocalID(Message* msg, std::unordered_set<int>* deadnodes_set,
                        Meta* nodes, Meta* recovery_nodes) {
  auto& ctrl = msg->meta.control;
  // 获取这个ps系统worker和server的总数
  size_t num_nodes =
      Postoffice::Get()->num_servers() + Postoffice::Get()->num_workers();
  // assign an id
  if (msg->meta.sender == Meta::kEmpty) {
    CHECK(is_scheduler_);
    CHECK_EQ(ctrl.node.size(), 1);
    //判断当前的已经正常运行的节点数与实际要求的节点数,如果小于实际要求数量,说明系统还处于启动阶段,还需要将
    //这个节点加进来,处理这个只有scheduler才能处理
    if (nodes->control.node.size() < num_nodes) {
      nodes->control.node.push_back(ctrl.node[0]);
    } else {
      // some node dies and restarts ,节点挂掉和重启的处理逻辑
      CHECK(ready_.load());
      for (size_t i = 0; i < nodes->control.node.size() - 1; ++i) {
        const auto& node = nodes->control.node[i];
        if (deadnodes_set->find(node.id) != deadnodes_set->end() &&
            node.role == ctrl.node[0].role) {
          auto& recovery_node = ctrl.node[0];
          // assign previous node id
          recovery_node.id = node.id;
          recovery_node.is_recovery = true;
          PS_VLOG(1) << "replace dead node " << node.DebugString()
                     << " by node " << recovery_node.DebugString();
          nodes->control.node[i] = recovery_node;
          recovery_nodes->control.node.push_back(recovery_node);
          break;
        }
      }
    }
  }

  // update my id,根据scheduler回传的数据更新自己的信息,这样才能保证注册成功
  for (size_t i = 0; i < ctrl.node.size(); ++i) {
    const auto& node = ctrl.node[i];
    if (my_node_.hostname == node.hostname && my_node_.port == node.port) {
      if (getenv("DMLC_RANK") == nullptr || my_node_.id == Meta::kEmpty) {
        my_node_ = node;
        std::string rank = std::to_string(Postoffice::IDtoRank(node.id));
#ifdef _MSC_VER
        _putenv_s("DMLC_RANK", rank.c_str());
#else
        setenv("DMLC_RANK", rank.c_str(), true);
#endif
      }
    }
  }
}

ok,接下来才是真正的注册了

sheduler 任务

// 在 sheduler 处理添加节点
if (is_scheduler_) {
    ProcessAddNodeCommandAtScheduler(msg, nodes, recovery_nodes);
  } 

看看 ProcessAddNodeCommandAtScheduler 做了哪些事情?

void Van::ProcessAddNodeCommandAtScheduler(Message* msg, Meta* nodes,
                                           Meta* recovery_nodes) {
  recovery_nodes->control.cmd = Control::ADD_NODE;
  time_t t = time(NULL);
  // 还是获取系统要求的worker和server总数
  size_t num_nodes =
      Postoffice::Get()->num_servers() + Postoffice::Get()->num_workers();
  // 判断实际收到注册的节点数与要求的节点数一致,那么就需要按照一定规则排序分配id
  if (nodes->control.node.size() == num_nodes) {
    // 按照 host+port的方式来排序
    std::sort(nodes->control.node.begin(), nodes->control.node.end(),
              [](const Node& a, const Node& b) {
                return (a.hostname.compare(b.hostname) | (a.port < b.port)) > 0;
              });
    // 分配节点rank值,循环遍历所有的节点
    for (auto& node : nodes->control.node) {
      std::string node_host_ip =
          node.hostname + ":" + std::to_string(node.port);
       //如果ip:port不存在van_中的话
      if (connected_nodes_.find(node_host_ip) == connected_nodes_.end()) {
        CHECK_EQ(node.id, Node::kEmpty);
        // 根据server和worker的角色调用rank to id 方法,这个在上一篇文章中有提到
        // server 2*rank+8 ,worker 2*rank+9
        int id = node.role == Node::SERVER
                     ? Postoffice::ServerRankToID(num_servers_)
                     : Postoffice::WorkerRankToID(num_workers_);
        PS_VLOG(1) << "assign rank=" << id << " to node " << node.DebugString();
        node.id = id;
        // scheduler连接上这个节点
        Connect(node);
        // 更新心跳,这个心跳机制是为了监控这个节点是否还在线
        Postoffice::Get()->UpdateHeartbeat(node.id, t);
        connected_nodes_[node_host_ip] = id;
      } else {
        int id = node.role == Node::SERVER
                     ? Postoffice::ServerRankToID(num_servers_)
                     : Postoffice::WorkerRankToID(num_workers_);
        shared_node_mapping_[id] = connected_nodes_[node_host_ip];
        node.id = connected_nodes_[node_host_ip];
      }
      // num_servers_ 这就是上面注释里面的rank(server 2*rank+8 ) 
      if (node.role == Node::SERVER) num_servers_++;
      if (node.role == Node::WORKER) num_workers_++;
    }
    // 开始向这些节点回复消息,消息的 cmd = Control::ADD_NODE
    nodes->control.node.push_back(my_node_);
    nodes->control.cmd = Control::ADD_NODE;
    Message back;
    back.meta = *nodes;
    for (int r : Postoffice::Get()->GetNodeIDs(kWorkerGroup + kServerGroup)) {
      int recver_id = r;
      if (shared_node_mapping_.find(r) == shared_node_mapping_.end()) {
        back.meta.recver = recver_id;
        back.meta.timestamp = timestamp_++;
        Send(back);
      }
    }
    PS_VLOG(1) << "the scheduler is connected to " << num_workers_
               << " workers and " << num_servers_ << " servers";
    // 这里表示 scheduler 已经准备好了
    ready_ = true;
  } else if (!recovery_nodes->control.node.empty()) {
    // 处理重启节点
    // 回去失去心跳的dead node
    auto dead_nodes = Postoffice::Get()->GetDeadNodes(heartbeat_timeout_);
    std::unordered_set<int> dead_set(dead_nodes.begin(), dead_nodes.end());
    // send back the recovery node
    CHECK_EQ(recovery_nodes->control.node.size(), 1);
    //连接节点和更新心跳
    Connect(recovery_nodes->control.node[0]);
    Postoffice::Get()->UpdateHeartbeat(recovery_nodes->control.node[0].id, t);
    Message back;
    for (int r : Postoffice::Get()->GetNodeIDs(kWorkerGroup + kServerGroup)) {
      if (r != recovery_nodes->control.node[0].id &&
          dead_set.find(r) != dead_set.end()) {
        // do not try to send anything to dead node
        continue;
      }
      // only send recovery_node to nodes already exist
      // but send all nodes to the recovery_node
      back.meta =
          (r == recovery_nodes->control.node[0].id) ? *nodes : *recovery_nodes;
      back.meta.recver = r;
      back.meta.timestamp = timestamp_++;
      Send(back);
    }
  }
}

worker和server任务

for (const auto& node : ctrl.node) {
      std::string addr_str = node.hostname + ":" + std::to_string(node.port);
      //在已连接的节点里去找这个要即将 add node 的节点
      if (connected_nodes_.find(addr_str) == connected_nodes_.end()) {
        //没找到则说明是个新节点,那么就连接这个节点并且记录下来
        Connect(node);
        connected_nodes_[addr_str] = node.id;
      }
      //根据角色名更新系统的server 和worker计数,
      if (!node.is_recovery && node.role == Node::SERVER) ++num_servers_;
      if (!node.is_recovery && node.role == Node::WORKER) ++num_workers_;
    }
    PS_VLOG(1) << my_node_.ShortDebugString() << " is connected to others";
    ready_ = true;

至此 ProcessAddNodeCommand 所有的任务都完成了。

阻塞任务 ProcessBarrierCommand

这块在上一篇文章讲了,我放个链接吧!https://www.deeplearn.me/4303.html

心跳管理 ProcessHearbeat

void Van::ProcessHearbeat(Message* msg) {
  auto& ctrl = msg->meta.control;
  time_t t = time(NULL);
  for (auto& node : ctrl.node) {
    //定时更新心跳,心跳记录即使时间戳
    Postoffice::Get()->UpdateHeartbeat(node.id, t);
    //scheduler节点需要相应心跳请求
    if (is_scheduler_) {
      Message heartbeat_ack;
      heartbeat_ack.meta.recver = node.id;
      heartbeat_ack.meta.control.cmd = Control::HEARTBEAT;
      heartbeat_ack.meta.control.node.push_back(my_node_);
      heartbeat_ack.meta.timestamp = timestamp_++;
      // send back heartbeat
      Send(heartbeat_ack);
    }
  }
}

为了记录网络的可达性,PS Lite 设计了心跳机制。具体而言:

  • 每一个节点的 PostOffice 单例中维护了一个 MAP 结构,存储了心跳关联的节点的活跃信息。键为节点编号,值为上次收到其 HEARTBEAT 消息的时间戳。
  • Worker/Server 只记录 Scheduler 的心跳,Scheduler 则记录所有节点的心跳。基于时间戳和心跳超时,可以输出所有的死亡节点。
  • 每一个 Worker/Server 节点,会新建立一个心跳线程,每隔 PS_HEARTBEAT_INTERVAL 秒向 Scheduler 发送一条 HEARTBEAT 消息;
  • Scheduler 节点收到后,响应一个 HEARTBEAT 消息。
  • scheduler进行应答,通过当前时间与心跳包接收时间之差判断是否alive。
  • Scheduler 会依据心跳节点的时间戳来判断死亡节点。如果新增的节点id在dead_node容器里,表示这个节点是重新恢复的;而新增节点通过schedular的中转与现有节点形成互联。

在Van::start函数中,给出了心跳的调用,对于非 scheduler 节点启动了一个线程,每隔 PS_HEARTBEAT_INTERVAL 秒向 Scheduler 发送一条 HEARTBEAT 消息:

    if (!is_scheduler_) {
      // start heartbeat thread
      heartbeat_thread_ =
          std::unique_ptr<std::thread>(new std::thread(&Van::Heartbeat, this));
    }

具体心跳函数是:

void Van::Heartbeat() {
  //从环境变量里读取间隔时间,定时发送心跳请求
  const char* val = Environment::Get()->find("PS_HEARTBEAT_INTERVAL");
  const int interval = val ? atoi(val) : kDefaultHeartbeatInterval;
  while (interval > 0 && ready_.load()) {
    std::this_thread::sleep_for(std::chrono::seconds(interval));
    Message msg;
    msg.meta.recver = kScheduler;
    msg.meta.control.cmd = Control::HEARTBEAT;
    msg.meta.control.node.push_back(my_node_);
    msg.meta.timestamp = timestamp_++;
    Send(msg);
  }
}

差不多到这里 Van的启动已经基本上都完成了。其实在Van里面还有关于通信消息的优化,比如Pb序列化,有兴趣可以去看看。

正文完
请博主喝杯咖啡吧!
post-qrcode
 
admin
版权声明:本站原创文章,由 admin 2023-03-11发表,共计17000字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
评论(没有评论)
验证码