现在的位置: 首页 > 综合 > 正文

Hama学习笔记(6)-获取各个peer(task)的信息、确定master task

2018年05月18日 ⁄ 综合 ⁄ 共 3019字 ⁄ 字号 评论关闭

有时候在bsp job中需要确定一个master task,这就需要获取各个peer(task)的信息,如host name、端口号等等。

hama中获取peer的主机名和端口号很方便:

peer.getAllPeerNames();

这个方法会返回一个String[],每一项对应一个peer,内容是"hostname:port",hostname就是peer所在机器的主机名,端口就是这个peer在那台机器上用来和其他peer通信的端口。在单机模式下hostname是local,port是0、1、2、3.。。,分布式模式下hostname是hama配置文件和/etc/hosts文件中相应的主机名,端口号默认配置下是61001、61002.。。等等

可以在hama bsp的setup方法中根据需要选举一个tast作为master,例如:

	public void setup(
			BSPPeer<NullWritable, NullWritable, NullWritable, NullWritable, TupleWritable> peer)
			throws IOException, SyncException, InterruptedException
	{
		String[] allPeerNames = peer.getAllPeerNames();
		int port = 0;
		Configuration conf = new Configuration("master");
		String master = conf.getItemValue("master");
		for (String peerName : allPeerNames)
		{
			if (peerName.split(":")[0].equals(master))
			{
				if (port == 0 || Integer.parseInt(peerName.split(":")[1]) < port)
				{
					port = Integer.parseInt(peerName.split(":")[1]);
					masterTask = peerName;
				}
			}
		}
		if (masterTask.equals(peer.getPeerName()))
		{
			//向其他peer发送消息
		}
	}

这里的Configuration是项目中自定义的,不是hama的Configuration。从项目配置文件中读取master所在的主机名,然后选择这台主机上端口号最小的一个作为master。

注意:也许有时候在确定了master task之后还需要给其他的peer发送消息,这种情况下不要忘了在发送消息的代码之后加上一个peer.sync(),这个方法在setup方法中也是可以用的。如果不加sync的话,hama在执行完setup方法之后是不会进行同步的,找到hama源码中执行bsp job的一段,即:

单机模式下org.apache.hama.bspLocalBSPRunner类中的run方法:

      peer = new BSPPeerImpl(job, conf, new TaskAttemptID(new TaskID(
          job.getJobID(), id), id), new LocalUmbilical(), id, splitname,
          realBytes, new Counters());
      // Throw the first exception and log all the other exception.
      Exception firstException = null;
      try {
        bsp.setup(peer);
        bsp.bsp(peer);
      } catch (Exception e) {
        LOG.error("Exception during BSP execution!", e);
        firstException = e;
      } finally {
        try {
          bsp.cleanup(peer);
        } catch (Exception e) {
          LOG.error("Error cleaning up after bsp execution.", e);
          if (firstException == null)
            firstException = e;
        } finally {
          try {
            peer.clear();
            peer.close();
          } catch (Exception e) {
            LOG.error("Exception closing BSP peer,", e);
            if (firstException == null)
              firstException = e;
          } finally {
            if (firstException != null)
              throw firstException;
          }
        }

分布式模式下org.apache.hama.bspBSPTask类中的runBSP方法:

  @SuppressWarnings("unchecked")
  private final static <KEYIN, VALUEIN, KEYOUT, VALUEOUT, M extends Writable> void runBSP(
      final BSPJob job,
      BSPPeerImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT, M> bspPeer,
      final BytesWritable rawSplit, final BSPPeerProtocol umbilical)
      throws Exception {

    BSP<KEYIN, VALUEIN, KEYOUT, VALUEOUT, M> bsp = (BSP<KEYIN, VALUEIN, KEYOUT, VALUEOUT, M>) ReflectionUtils
        .newInstance(job.getConfiguration().getClass("bsp.work.class", BSP.class),
            job.getConfiguration());

    // The policy is to throw the first exception and log the remaining.
    Exception firstException = null;
    try {
      bsp.setup(bspPeer);
      bsp.bsp(bspPeer);
    } catch (Exception e) {
      LOG.error("Error running bsp setup and bsp function.", e);
      firstException = e;
    } finally {
      try {
        bsp.cleanup(bspPeer);
      } catch (Exception e) {
        LOG.error("Error cleaning up after bsp executed.", e);
        if (firstException == null)
          firstException = e;
      } finally {

        try {
          bspPeer.close();
        } catch (Exception e) {
          LOG.error("Error closing BSP Peer.", e);
          if (firstException == null)
            firstException = e;
        }
        if (firstException != null)
          throw firstException;
      }
    }
  }

可见,调用setup之后紧接着就是调用bsp方法,setup中不加sync的话,可能master task正在发送消息,其他的peer已经进入bsp函数执行第一个超步了,这时他们试图读取master task发来的消息是读不到的。当然啦,也可以发第一轮消息发送放在第一个超步中。

抱歉!评论已关闭.