现在的位置: 首页 > 搜索技术 > 黄专家专栏 > 正文

Hadoop Local 模式运行 Pipes 程序

2014年10月30日 搜索技术, 黄专家专栏 ⁄ 共 2024字 ⁄ 字号 评论关闭

现在用的 hadoop 的版本是 0.20.2-cdh3u6。 cdh 是 Cloudera 的开源版本。

使用 local 模式的时候,会出现几个错误,记录如下:

1. java.lang.NullPointerException 异常

1
2
3
4
5
6
7
java.lang.Exception: java.lang.NullPointerException
  at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:349)
Caused by: java.lang.NullPointerException
  at org.apache.hadoop.mapred.pipes.Application.<init>(Application.java:103)
  at org.apache.hadoop.mapred.pipes.PipesMapRunner.run(PipesMapRunner.java:68)
  at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:390)
  ... ...

追踪以上错误的代码,我们可以知道,空指针异常是由于 jobToken 引起的,深入代码可以知道,hadoop pipes 从 TokenCache 中读入一个叫 “ShuffleAndJobToken” 的 token,然后写入一个 jobTokenPassword 文件。但是在 local 模式下并没有这个 key 对应的 token, 也就无从写入。 所以,修改代码

1
2
Token<JobTokenIdentifier> jobToken = TokenCache.getJobToken(conf.getCredentials());
byte[]  password = jobToken.getPassword();

1
2
3
4
5
Token<JobTokenIdentifier> jobToken = TokenCache.getJobToken(conf.getCredentials());
byte[] password = "no password".getBytes();
if (jobToken != null) {
  password = jobToken.getPassword();
}

2. jobTokenPassword 的文件权限

修改 jobTokenPassword 的文件权限

1
2
FSDataOutputStream out = FileSystem.create(localFs, localPath,
  new FsPermission("400"));

改为

1
2
FSDataOutputStream out = FileSystem.create(localFs, localPath,
  new FsPermission("666"));

3. userlog 目录的生成

在 src/mapred/org/apache/hadoop/mapred/pipes/Application.java 文件中,一下代码会将标准输入和标准错误重定向到你的日志文件中。

一般日志文件是在 ${hadoop.log.dir}/userlog/${jobid}/${taskid}/stdout 这样的方式出现的,但是在 local 模式中,不会为你建立这样的目录,所以导致执行 pipes 的 c++ 进程失败。

修改代码建立日志目录即可

1
2
3
File stdout = TaskLog.getTaskLogFile(taskid, false, TaskLog.LogName.STDOUT);
File stderr = TaskLog.getTaskLogFile(taskid, false, TaskLog.LogName.STDERR);
long logLength = TaskLog.getTaskLogLength(conf);

加入建立目录的代码

1
2
3
4
5
6
7
8
9
10
11
File stdout = TaskLog.getTaskLogFile(taskid, false, TaskLog.LogName.STDOUT);
File stderr = TaskLog.getTaskLogFile(taskid, false, TaskLog.LogName.STDERR);
long logLength = TaskLog.getTaskLogLength(conf);

String[] dirs = new String[1];
dirs[0] = TaskLog.getAttemptDir(taskid, false).toString();
try {
  TaskLog.createTaskAttemptLogDir(taskid, false, dirs);
} catch (IOException e) {
  LOG.info("Creation of failed.");      // 日志目录已经存在
}

现在 local 模式的 pipes 程序就可以运行了

抱歉!评论已关闭.