有时候在bsp job中需要确定一个master task,这就需要获取各个peer(task)的信息,如host name、端口号等等。
hama中获取peer的主机名和端口号很方便:
peer.getAllPeerNames();
这个方法会返回一个String[],每一项对应一个peer,内容是"hostname:port",hostname就是peer所在机器的主机名,端口就是这个peer在那台机器上用来和其他peer通信的端口。在单机模式下hostname是local,port是0、1、2、3.。。,分布式模式下hostname是hama配置文件和/etc/hosts文件中相应的主机名,端口号默认配置下是61001、61002.。。等等
可以在hama bsp的setup方法中根据需要选举一个tast作为master,例如:
public void setup( BSPPeer<NullWritable, NullWritable, NullWritable, NullWritable, TupleWritable> peer) throws IOException, SyncException, InterruptedException { String[] allPeerNames = peer.getAllPeerNames(); int port = 0; Configuration conf = new Configuration("master"); String master = conf.getItemValue("master"); for (String peerName : allPeerNames) { if (peerName.split(":")[0].equals(master)) { if (port == 0 || Integer.parseInt(peerName.split(":")[1]) < port) { port = Integer.parseInt(peerName.split(":")[1]); masterTask = peerName; } } } if (masterTask.equals(peer.getPeerName())) { //向其他peer发送消息 } }
这里的Configuration是项目中自定义的,不是hama的Configuration。从项目配置文件中读取master所在的主机名,然后选择这台主机上端口号最小的一个作为master。
注意:也许有时候在确定了master task之后还需要给其他的peer发送消息,这种情况下不要忘了在发送消息的代码之后加上一个peer.sync(),这个方法在setup方法中也是可以用的。如果不加sync的话,hama在执行完setup方法之后是不会进行同步的,找到hama源码中执行bsp job的一段,即:
单机模式下org.apache.hama.bspLocalBSPRunner类中的run方法:
peer = new BSPPeerImpl(job, conf, new TaskAttemptID(new TaskID( job.getJobID(), id), id), new LocalUmbilical(), id, splitname, realBytes, new Counters()); // Throw the first exception and log all the other exception. Exception firstException = null; try { bsp.setup(peer); bsp.bsp(peer); } catch (Exception e) { LOG.error("Exception during BSP execution!", e); firstException = e; } finally { try { bsp.cleanup(peer); } catch (Exception e) { LOG.error("Error cleaning up after bsp execution.", e); if (firstException == null) firstException = e; } finally { try { peer.clear(); peer.close(); } catch (Exception e) { LOG.error("Exception closing BSP peer,", e); if (firstException == null) firstException = e; } finally { if (firstException != null) throw firstException; } }
分布式模式下org.apache.hama.bspBSPTask类中的runBSP方法:
@SuppressWarnings("unchecked") private final static <KEYIN, VALUEIN, KEYOUT, VALUEOUT, M extends Writable> void runBSP( final BSPJob job, BSPPeerImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT, M> bspPeer, final BytesWritable rawSplit, final BSPPeerProtocol umbilical) throws Exception { BSP<KEYIN, VALUEIN, KEYOUT, VALUEOUT, M> bsp = (BSP<KEYIN, VALUEIN, KEYOUT, VALUEOUT, M>) ReflectionUtils .newInstance(job.getConfiguration().getClass("bsp.work.class", BSP.class), job.getConfiguration()); // The policy is to throw the first exception and log the remaining. Exception firstException = null; try { bsp.setup(bspPeer); bsp.bsp(bspPeer); } catch (Exception e) { LOG.error("Error running bsp setup and bsp function.", e); firstException = e; } finally { try { bsp.cleanup(bspPeer); } catch (Exception e) { LOG.error("Error cleaning up after bsp executed.", e); if (firstException == null) firstException = e; } finally { try { bspPeer.close(); } catch (Exception e) { LOG.error("Error closing BSP Peer.", e); if (firstException == null) firstException = e; } if (firstException != null) throw firstException; } } }
可见,调用setup之后紧接着就是调用bsp方法,setup中不加sync的话,可能master task正在发送消息,其他的peer已经进入bsp函数执行第一个超步了,这时他们试图读取master task发来的消息是读不到的。当然啦,也可以发第一轮消息发送放在第一个超步中。