现在的位置: 首页 > 综合 > 正文

Hama学习笔记(4)-消息的发送与存储

2018年05月18日 ⁄ 综合 ⁄ 共 3069字 ⁄ 字号 评论关闭

Hama的文档目前还不详细,没有找到关于Hama如何发送消息的说明,只好自己动手做实验了。

按照BSP的模型,每一个超步应该是有明确的三个步骤的:计算->通信->同步

但是Hama当中,在节点进入同步状态之前,是否可以和其他结点即时地收发消息呢?如果可以,无疑会使得bsp程序更加灵活,但是这样也会带来不必要的麻烦:如果bsp程序设计不当,各个节点之间随意通信可能会使得程序的性能非常糟糕。并且这样也增加了容错的难度。

为了搞清楚通信的情况,做了如下实验:

打开计算PI的example,在MyEstimator的bsp方法中调用peer.send之前设置断点,其实send方法的注释文档就已经告诉我们一些有用的信息了:

   * Send a data with a tag to another BSPSlave corresponding to hostname.
   * Messages sent by this method are not guaranteed to be received in a sent
   * order.

可见hama bsp并不严格地保证消息的接收和发送顺序一致。

在eclipse中开始debug(单击模式,此时hama用多线程来模拟多个节点的计算),发现调用send最终执行的是org.apache.hama.bsp.message.AbstractMessageManager.send()中的一段代码:

    InetSocketAddress targetPeerAddress = null;
    // Get socket for target peer.
    if (peerSocketCache.containsKey(peerName)) {
      targetPeerAddress = peerSocketCache.get(peerName);
    } else {
      targetPeerAddress = BSPNetUtils.getAddress(peerName);
      peerSocketCache.put(peerName, targetPeerAddress);
    }
    MessageQueue<M> queue = outgoingQueues.get(targetPeerAddress);
    if (queue == null) {
      queue = getQueue();
    }
    queue.add(msg);
    peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_SENT, 1L);
    outgoingQueues.put(targetPeerAddress, queue);
    notifySentMessage(peerName, msg);

可见这里是将要发送的消息添加到队列中了,并没有将消息发送出去。

继续debug,发现调用sync方法执行的是如下的代码:

    // normally all messages should been send now, finalizing the send phase
    messenger.finishSendPhase();
    Iterator<Entry<InetSocketAddress, MessageQueue<M>>> it = messenger
        .getMessageIterator();

    while (it.hasNext()) {
      Entry<InetSocketAddress, MessageQueue<M>> entry = it.next();
      final InetSocketAddress addr = entry.getKey();
      final Iterable<M> messages = entry.getValue();

      final BSPMessageBundle<M> bundle = combineMessages(messages);
      // remove this message during runtime to save a bit of memory
      it.remove();
      try {
        messenger.transfer(addr, bundle);
      } catch (Exception e) {
        LOG.error("Error while sending messages", e);
      }
    }

    if (this.faultToleranceService != null) {
      try {
        this.faultToleranceService.beforeBarrier();
      } catch (Exception e) {
        throw new IOException(e);
      }
    }

    long startBarrier = System.currentTimeMillis();
    enterBarrier();

    if (this.faultToleranceService != null) {
      try {
        this.faultToleranceService.duringBarrier();
      } catch (Exception e) {
        throw new IOException(e);
      }
    }

    // Clear outgoing queues.
    messenger.clearOutgoingQueues();

    leaveBarrier();

    incrementCounter(PeerCounter.TIME_IN_SYNC_MS,
        (System.currentTimeMillis() - startBarrier));
    incrementCounter(PeerCounter.SUPERSTEP_SUM, 1L);

    currentTaskStatus.setCounters(counters);

    if (this.faultToleranceService != null) {
      try {
        this.faultToleranceService.afterBarrier();
      } catch (Exception e) {
        throw new IOException(e);
      }
    }

    umbilical.statusUpdate(taskId, currentTaskStatus);

从第一行的注释即可看出,之前send要发送的消息在开始同步时才会真正地发送出去。此外,貌似在

messenger.clearOutgoingQueues();

中会准备好本地的消息队列,之后才可以读取从其他结点发送过来的消息,具体怎么收消息还没研究好,经过实验发现似乎是在sync返回之后,才能接受到从其他节点发送过来的消息,在sync之前getCurrentMessage()得到的消息总是空值。

由此大概得出了结论:hama bsp在一个超步中只能发消息或者处理上一个超步中接收到的消息。

此外,Hama源码中org.apache.hama.bsp.message.AbstractMessageManager中,用于接收消息的localQueue是用getQueue方法初始化的,而getQueue默认返回的是MemoryQueue,也就是说:除非配置了使用DiskQueue,Hama bsp会将收到的消息放在内存中。从org.apache.hama.bsp.message.AbstractMessageManager.sned()的实现也可以到,发送队列也是用getQueue初始化的,也就有了和接受队列一样的队列类型。

再者,上一个超步中接收到的数据,必须在紧接着的下一个超步中处理完毕,否则接收队列会被清空。

【引用请注明出处:http://blog.csdn.net/bhq2010/article/details/8548070。谢谢!】

抱歉!评论已关闭.