現在的位置: 首頁 > 搜索技術 > 黃專家專欄 > 正文

Hadoop Local 模式運行 Pipes 程序

2014年10月30日 搜索技術, 黃專家專欄 ⁄ 共 2024字 ⁄ 字型大小 評論關閉

現在用的 hadoop 的版本是 0.20.2-cdh3u6。 cdh 是 Cloudera 的開源版本。

使用 local 模式的時候,會出現幾個錯誤,記錄如下:

1. java.lang.NullPointerException 異常

1
2
3
4
5
6
7
java.lang.Exception: java.lang.NullPointerException
  at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:349)
Caused by: java.lang.NullPointerException
  at org.apache.hadoop.mapred.pipes.Application.<init>(Application.java:103)
  at org.apache.hadoop.mapred.pipes.PipesMapRunner.run(PipesMapRunner.java:68)
  at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:390)
  ... ...

追蹤以上錯誤的代碼,我們可以知道,空指針異常是由於 jobToken 引起的,深入代碼可以知道,hadoop pipes 從 TokenCache 中讀入一個叫 「ShuffleAndJobToken」 的 token,然後寫入一個 jobTokenPassword 文件。但是在 local 模式下並沒有這個 key 對應的 token, 也就無從寫入。 所以,修改代碼

1
2
Token<JobTokenIdentifier> jobToken = TokenCache.getJobToken(conf.getCredentials());
byte[]  password = jobToken.getPassword();

1
2
3
4
5
Token<JobTokenIdentifier> jobToken = TokenCache.getJobToken(conf.getCredentials());
byte[] password = "no password".getBytes();
if (jobToken != null) {
  password = jobToken.getPassword();
}

2. jobTokenPassword 的文件許可權

修改 jobTokenPassword 的文件許可權

1
2
FSDataOutputStream out = FileSystem.create(localFs, localPath,
  new FsPermission("400"));

改為

1
2
FSDataOutputStream out = FileSystem.create(localFs, localPath,
  new FsPermission("666"));

3. userlog 目錄的生成

在 src/mapred/org/apache/hadoop/mapred/pipes/Application.java 文件中,一下代碼會將標準輸入和標準錯誤重定向到你的日誌文件中。

一般日誌文件是在 ${hadoop.log.dir}/userlog/${jobid}/${taskid}/stdout 這樣的方式出現的,但是在 local 模式中,不會為你建立這樣的目錄,所以導致執行 pipes 的 c++ 進程失敗。

修改代碼建立日誌目錄即可

1
2
3
File stdout = TaskLog.getTaskLogFile(taskid, false, TaskLog.LogName.STDOUT);
File stderr = TaskLog.getTaskLogFile(taskid, false, TaskLog.LogName.STDERR);
long logLength = TaskLog.getTaskLogLength(conf);

加入建立目錄的代碼

1
2
3
4
5
6
7
8
9
10
11
File stdout = TaskLog.getTaskLogFile(taskid, false, TaskLog.LogName.STDOUT);
File stderr = TaskLog.getTaskLogFile(taskid, false, TaskLog.LogName.STDERR);
long logLength = TaskLog.getTaskLogLength(conf);

String[] dirs = new String[1];
dirs[0] = TaskLog.getAttemptDir(taskid, false).toString();
try {
  TaskLog.createTaskAttemptLogDir(taskid, false, dirs);
} catch (IOException e) {
  LOG.info("Creation of failed.");      // 日誌目錄已經存在
}

現在 local 模式的 pipes 程序就可以運行了

抱歉!評論已關閉.