现在的位置: 首页 > 综合 > 正文

Hama学习笔记(3)-编写BSP程序

2018年05月18日 ⁄ 综合 ⁄ 共 2661字 ⁄ 字号 评论关闭

Hama中提供了BSP框架的编程接口,就像MapReduce一样方便使用。

[引用请注明出处:http://blog.csdn.net/bhq2010/article/details/8531243]

BSP框架

首先明确一下BSP的概念:

BSP是一个计算框架,按照这个框架编写的BSP程序会在集群的各个节点上做本地的I/O和计算,这一点和MapReduce相似(其实BSP的提出比MapReduce还要早差不多10年,应该算前辈才是~),但不同的是BSP框架中,各个节点之间可以进行比较有效的通信。

一个BSP程序(或者叫BSP Job)的执行过程中包含了若干个超步(Supersteps),每个超步的执行过程又有以下三个步骤:

各个节点本地的计算->节点间通信->节点同步

第一和第二个步骤之前其实没有明确的界限。在一个超步中,各个结点在进入同步状态之前可以随时进行I/O和通信。

当某个结点认为自己的计算任务已经完成时,可以进入同步状态并挂起。当一个超步中所有的结点都进入同步状态时,一个超步就结束了,各个节点从挂起处开始继续执行,所有结点都退出时,整个BSP程序就结束了。

继承BSP类

Hama中编写BSP程序和Hadoop MapReduce差不多,首先写一个类,继承Hama API中的BSP抽象类,例如:

public static class MyEstimator
     extends BSP<NullWritable, NullWritable, Text, DoubleWritable, DoubleWritable>

这个类不一定是static的,以上只是hama.example里计算PI的一个例程。

然后要实现BSP类中的抽象方法bsp,例如:

public void bsp(BSPPeer<NullWritable, NullWritable, Text, DoubleWritable, DoubleWritable> peer)
	throws IOException, SyncException, InterruptedException
{.....}

此外,和Hadoop MapReduce类似,BSP类中还有两个方法可以重载:setup和cleanup

这两个方法分别在一个BSP程序执行前后进行初始化和清理的工作。

一个完整的BSP程序见上一篇日志:http://blog.csdn.net/bhq2010/article/details/8513052

文件I/O

在配置BSP Job时,可以为其指定输入输出格式和路径,和Hadoop很相似,例如:

job.setInputPath(new Path("/tmp/sequence.dat");
  job.setInputFormat(org.apache.hama.bsp.SequenceFileInputFormat.class);
  or,
  SequenceFileInputFormat.addInputPath(job, new Path("/tmp/sequence.dat"));
  or,
  SequenceFileInputFormat.addInputPaths(job, "/tmp/seq1.dat,/tmp/seq2.dat,/tmp/seq3.dat");
  
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(IntWritable.class);
  job.setOutputFormat(TextOutputFormat.class);
  FileOutputFormat.setOutputPath(job, new Path("/tmp/result"));

其中setInputFormat和setOutputFormat是设置输入和输出文件的格式的,默认是文本格式的,这和Hadoop的setInputFormatClass、setOutputFormatClass作用一样。这样在bsp方法中就可以用BSPPeer类型的参数peer来读取输入文件(通常是在HDFS上)并向输出文件中写入了,例如:

public final void bsp(
      BSPPeer<LongWritable, Text, Text, LongWritable, Text> peer)
      throws IOException, InterruptedException, SyncException {
      
      // this method reads the next key value record from file
      KeyValuePair<LongWritable, Text> pair = peer.readNext();

      // the following lines do the same:
      LongWritable key = new LongWritable();
      Text value = new Text();
      peer.readNext(key, value);
      
      // write
      peer.write(value, key);
  }

需要重新打开输入文件重新读取,可以用peer.reopenInput()方法。

此外,在bsp中也可已随意访问合法的文件,不过这些文件IO就没法在配置BSP Job时指定,而只能硬编码了。

计算结点间通信

Hama为BSP提供的通信API如下:

方法 描述
send(String peerName, BSPMessage msg) Sends a message to another peer.
getCurrentMessage() Returns a received message.
getNumCurrentMessages() Returns the number of received messages.
sync() Barrier synchronization.
getPeerName() Returns a peer's hostname.
getAllPeerNames() Returns all peer's hostname.
getSuperstepCount() Returns the count of supersteps

这些都是bsp方法的参数peer的方法,像上面调用read、write方法一样调用即可。

同步

调用peer.sync()方法可以使当前节点进入同步状态,当所有的节点都进入同步状态后,同步完成,开始下一个超步或者结束Job。

[引用请注明出处:http://blog.csdn.net/bhq2010/article/details/8531243]

抱歉!评论已关闭.