现在的位置: 首页 > 综合 > 正文

hama学习笔记(2)-在eclipse中编译hama源码、写hama job

2018年05月18日 ⁄ 综合 ⁄ 共 4126字 ⁄ 字号 评论关闭

写hama job

如果只是写hama job,根本不需要eclipse,所有的代码都可一在一个java文件中搞定。不过用惯了eclipse的人表示vim之类的实在用不惯。

在eclipse中可以建一个user library:

在eclipse菜单栏中:Window->Preferences->Java->Build Path->User Libraries->New新建一个user library,例如hama-0.6.0,勾选System Library。然后Add External JARs,将HAMA_HOME/lib中的jar包和HAMA_HOME下的jar包加进来。

新建Java Project时将这个user library加入工程,就OK了,可以试试hama example中计算PI的例子:

[转载请注明出处:http://blog.csdn.net/bhq2010/article/details/8513052]

import java.io.IOException;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.BSP;
import org.apache.hama.bsp.BSPJob;
import org.apache.hama.bsp.BSPJobClient;
import org.apache.hama.bsp.BSPPeer;
import org.apache.hama.bsp.ClusterStatus;
import org.apache.hama.bsp.FileOutputFormat;
import org.apache.hama.bsp.NullInputFormat;
import org.apache.hama.bsp.TextOutputFormat;
import org.apache.hama.bsp.sync.SyncException;

public class PiEstimator
{
    private static Path TMP_OUTPUT = new Path("/tmp/pi-"
	    + System.currentTimeMillis());

    public static class MyEstimator
	    extends
	    BSP<NullWritable, NullWritable, Text, DoubleWritable, DoubleWritable>
    {
	public static final Log LOG = LogFactory.getLog(MyEstimator.class);
	private String masterTask;
	private static final int iterations = 10000;

	@Override
	public void bsp(
		BSPPeer<NullWritable, NullWritable, Text, DoubleWritable, DoubleWritable> peer)
		throws IOException, SyncException, InterruptedException
	{

	    int in = 0;
	    for (int i = 0; i < iterations; i++)
	    {
		double x = 2.0 * Math.random() - 1.0, y = 2.0 * Math.random() - 1.0;
		if ((Math.sqrt(x * x + y * y) < 1.0))
		{
		    in++;
		}
	    }

	    double data = 4.0 * in / iterations;

	    peer.send(masterTask, new DoubleWritable(data));
	    peer.sync();
	}

	@Override
	public void setup(
		BSPPeer<NullWritable, NullWritable, Text, DoubleWritable, DoubleWritable> peer)
		throws IOException
	{
	    // Choose one as a master
	    this.masterTask = peer.getPeerName(peer.getNumPeers() / 2);
	}

	@Override
	public void cleanup(
		BSPPeer<NullWritable, NullWritable, Text, DoubleWritable, DoubleWritable> peer)
		throws IOException
	{
	    if (peer.getPeerName().equals(masterTask))
	    {
		double pi = 0.0;
		int numPeers = peer.getNumCurrentMessages();
		DoubleWritable received;
		while ((received = peer.getCurrentMessage()) != null)
		{
		    pi += received.get();
		}

		pi = pi / numPeers;
		peer.write(new Text("Estimated value of PI is"),
			new DoubleWritable(pi));
	    }
	}
    }

    static void printOutput(HamaConfiguration conf) throws IOException
    {
	FileSystem fs = FileSystem.get(conf);
	FileStatus[] files = fs.listStatus(TMP_OUTPUT);
	for (int i = 0; i < files.length; i++)
	{
	    if (files[i].getLen() > 0)
	    {
		FSDataInputStream in = fs.open(files[i].getPath());
		IOUtils.copyBytes(in, System.out, conf, false);
		in.close();
		break;
	    }
	}

	fs.delete(TMP_OUTPUT, true);
    }

    public static void main(String[] args) throws InterruptedException,
	    IOException, ClassNotFoundException
    {
	// BSP job configuration
	HamaConfiguration conf = new HamaConfiguration();

	BSPJob bsp = new BSPJob(conf, PiEstimator.class);
	// Set the job name
	bsp.setJobName("Pi Estimation Example");
	bsp.setBspClass(MyEstimator.class);
	bsp.setInputFormat(NullInputFormat.class);
	bsp.setOutputKeyClass(Text.class);
	bsp.setOutputValueClass(DoubleWritable.class);
	bsp.setOutputFormat(TextOutputFormat.class);
	FileOutputFormat.setOutputPath(bsp, TMP_OUTPUT);

	BSPJobClient jobClient = new BSPJobClient(conf);
	ClusterStatus cluster = jobClient.getClusterStatus(true);

	if (args.length > 0)
	{
	    bsp.setNumBspTask(Integer.parseInt(args[0]));
	} else
	{
	    // Set to maximum
	    bsp.setNumBspTask(cluster.getMaxTasks());
	}

	long startTime = System.currentTimeMillis();
	if (bsp.waitForCompletion(true))
	{
	    printOutput(conf);
	    System.out.println("Job Finished in "
		    + (System.currentTimeMillis() - startTime) / 1000.0
		    + " seconds");
	}
    }
}

Run as Java Application即可,这样运行是在单机模式下的,不需要安装和启动Hama集群。如果要在集群上运行可以将工程Export成Jar文件,发到集群上运行。

编译hama源码

hama的源码工程是用maven构建的,下载hama的src包,解压;

在eclipse中安装m2e即可Import->Maven->Existing Maven Project->选择解压后的hama源码所在的目录,就可以导入,第一次导入时,maven会去下载依赖的包,所以时间比较长。

导入后有hama-core\hama-graph等6个工程,之后就可以用maven插件编译、调试,研究hama源码了。

如果不用eclipse,则需要下载安装maven2,过程google一下,随处可见。

为了省事,可以下载Juno版的eclipse for jave EE developer.这个版本的eclipse中带有了完整的m2e插件。

抱歉!评论已关闭.