上一篇博文里,已经在linux环境下实现单机版JNI的验证。这一篇,进入“阶段二”,将JNI程序放到hadoop上跑通。
这个阶段的尝试我吃了不少苦头,主要是路径问题:hadoop将我写好的jar包分发到每个tasknode上,同时,我们要把.so也分发到相同路径下,并“告诉”tasknode,使得jvm在运行jar包的时候能够找到这个动态库。
还是按顺序说。写hadoop程序。
map函数:
public static class MapTestJni extends Mapper<Writable, Text, Text, Text> { protected String s; protected void setup(Context context) throws IOException, InterruptedException { s = FakeSegmentForJni.SegmentALine("jni-value"); } protected void map(Writable key, Text value, Context context) throws IOException, InterruptedException { // the format of input value is: // mcid totaltimes item1 item2(itemkey=itemvalue) context.write(new Text("key"), new Text(s.toString())); } }
在setup函数中,调用动态库初始化了一个字符串。这个字符串s的值应该是“jni-value--copy that”(参考上一篇动态库的实现)。在map函数中,简单的输出这个字符串。
reduce函数:
public static class ReduceTestJni extends Reducer<Text, Text, Text, Text> { protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { String outString = ""; for (Text value: values) { outString = value.toString(); } context.write(key, new Text(outString)); } }
reduce函数也很简单,直接输出这个字符串。
控制函数:先列出来正确的代码吧,然后再吐苦水。
public void runTestJni (String[] args) throws Exception { // the configuration Configuration conf = new Configuration(); GenericOptionsParser goparser = new GenericOptionsParser(conf, args); String otherargs [] = goparser.getRemainingArgs(); // the job Job job; job = new Job(conf, "@here-TestFakeSegmentForJni-hadoopJni"); job.setJarByClass(TestFakeSegmentForJni.class); // the mapper job.setMapperClass(MapTestJni.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); // the reducer job.setReducerClass(ReduceTestJni.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setNumReduceTasks(1); // the path FileInputFormat.addInputPath(job, new Path(otherargs[1])); FileOutputFormat.setOutputPath(job, new Path(otherargs[2])); job.waitForCompletion(true); }
其中GenericOptionsParser那两行是关键,用来分析和执行hadoop命令中传进来的特殊参数。配合命令行中的命令(下文写),他把动态库分发到tasknode上,路径与jar的执行路径相同。
通常用jar包的形式运行hadoop程序,所需的参数,如:输入路径、输出路径、mapper、combiner、reducer等都可以用Job来设置,不需要额外的参数。命令行中少了这些参数,会显得短很多。尤其hadoop命令行一般都挺长,就很方便。相反地,采用c++ streaming的方式来运行程序的时候,就需要用-input、-output等参数来指定相关参数。不过,在jar包中,除了用Job外,也可以用GenericOptionsParser来解析上述命令行中的参数,只要命令行配合有相应的输入,GenericOptionsParser就可以解析。对于-input、-output等来讲,没有必要这样做。还有一个参数是-files,就是把-files后面的文件(多个文件用逗号间隔)同jar包一起分发到tasknode中,这个参数,刚好可以将我们的动态库分发下去。
“goparser.getRemainingArgs();”这条语句,是在GenericOptionsParser解析完特殊参数之后,获得剩下的参数列表,对于我们来讲,剩下的参数就是main函数所在的类名、输入路径和输出路径,参见下面的命令行。
main函数:
public static void main(String[] args) throws Exception { System.out.println ("In this project, we test jni!\n"); // test jni on linux local /*String s = FakeSegmentForJni.SegmentALine("now we test FakeSegmentForJni"); System.out.print(s);*/ // test jni on hadoop new TestFakeSegmentForJni().runTestJni(args); } // main
这个不用解释了。然后直接打成jar包。
在命令行中提交hadoop任务:
hadoop jar /xxx/TestFakeSegmentForJniHadoop.jar -files /xxx/TestJni/libFakeSegmentForJni.so FakeSegmentForJni.TestFakeSegmentForJni /input/xxx.txt /outputJNI
这个也是关键。说一下几个需要注意的地方吧:
- hadoop jar 命令后面跟随的第一个参数一定是打好的jar包,在本例中是TestFakeSegmentForJniHadoop.jar文件及其路径
- 由于在控制函数中用了GenericOptionsParser,jar包后面就必须紧跟需要设定的参数,这里,我们的参数是“-files /xxx/TestJni/libFakeSegmentForJni.so”,表示把本地路径“/xxx/TestJni/”中的libFakeSegmentForJni.so文件随jar包分发下去。
- 剩下的就比较容易了,分别是main函数所在的类名、输入路径、输出路径
- 运行的时候,因为当前路径是默认搜索路径之一,所以jvm能够找到动态库
接下来,说一下我踩到的“坑”们,不怕丢人哈。
- 我尝试过在控制函数里面设置tasknode的java.library.path属性,语句为“conf.set("java.library.path", ".")”,失败。
- 我尝试过在控制函数里面设置tasknode的jvm的hadoop.native.lib属性,语句为“conf.set("hadoop.native.lib", "true");”,失败。
- 我尝试过在控制函数里面设置tasknode的mapred.job.classpath属性,语句为“conf.set("mapred.job.classpath", "./")”,失败。
- 我尝试过在控制函数里面采用DistributedCache,用来将.so文件分发到tasknode上,语句为“DistributedCache.addFileToClassPath(new Path("/xxx/TestJni/libFakeSegmentForJni.so"), conf);”,失败。
- 我尝试过在class FakeSegmentForJni中用System.setProperty来设置java.library.path,语句为“System.setProperty("java.library.path", "./");”,失败。
- 我尝试了各种可能的路径,失败。
列一个我整理之前的控制函数(片段),如下:
public void runTestJni (String[] args) throws Exception { // the configuration Configuration conf = new Configuration(); // conf.set("java.library.path", "./Jars"); // DistributedCache.addFileToClassPath(new Path("/xxx/libFakeSegmentForJni.so"), conf); /*conf.set("hadoop.native.lib", "true"); conf.set("java.library.path", "./"); conf.set("mapred.job.classpath", "./"); DistributedCache.createSymlink(conf); DistributedCache.addFileToClassPath(new Path("/xxx/libFakeSegmentForJni.so"), conf);*/ GenericOptionsParser goparser = new GenericOptionsParser(conf, args); String otherargs [] = goparser.getRemainingArgs(); // the job Job job; job = new Job(conf, "@xiaojinghui-TestFakeSegmentForJni-hadoopJni"); job.setJarByClass(TestFakeSegmentForJni.class); ......
可以看到,我在"Job job"之前注释的有多乱。这还是删除了一些绝对不靠谱的尝试语句呢。可想而知,我当时的郁闷。
最后提一句,设置java.library.path失败,有两种说法:
- 这是一个bug,至今未解决。大家可以搜搜去,在似乎oracle的一个网站上,有个人遇到了和我一样的问题,似乎是官方人员回答的。
- 这是一个design,当tasknode的jvm一旦运行起来,就没有办法动态更改它的属性了。
参考文献: