概述
基于hadoop的集群分布式数据处理目前是淘宝搜索中心最重要的数据处理平台,在集群物理条件确定的情况下,有几个方面影响了数据处理的速度。
1、数据大小 (影响磁盘IO和网络IO)
2、数据格式 (影响数据的解析及构造速度)
3、并行度
使用 protocolBuffer + lzo技术,能帮我们做到数据小解析快并行度高这三点, 能帮我们大幅度提高处理的速度。下面详细介绍一下如何编译部署及开发相关代码。
hadoop介绍
请参考 分布式计算开源框架Hadoop介绍 和 官方网站 http://hadoop.apache.org
protocolBuffer介绍
官方网站http://code.google.com/intl/zh-CN/apis/protocolbuffers/docs/overview.html
Google定义的一套数据协议,用于数据的结构化和序列化。 Google绝大部分模块数据交互基于此数据协议。
1、平台无关、语言无关。
2、二进制、数据自描述。
3、提供了完整详细的操作API。
4、高性能 比xml要快20-100倍
5、尺寸小 比xml要小3-10倍 –高可扩展性
6、数据自描述、前后兼容
适用于
1、不同的平台、系统、语言、模块之间高效的数据交互
2、用于构建大型的复杂系统,降低数据层面的耦合度和复杂度
这里要特别着重说的是protocolBuffer是一种数据协议,就像tcp/ip协议一样,只要是遵守此协议的任何系统之间都能高效的进行数据交互。
第二个特别要说的是 数据自描述。 也就是说拿到任何一个protocolBuffer的数据文件,我们不需要任何其他的辅助信息,就能顺利的解析出其中的数据信息。
这2点是最本质的。
google同时提供了一套代码生成工具,能根据用户自定义的.proto文件,生成c++/java/python的 代码,用于调用protocolBuffer的内核API . 给我们使用提供了很大的便利
.proto文件 详细请参考 官方网站 http://code.google.com/intl/zh-CN/apis/protocolbuffers/docs/overview.html
lzo介绍
LZO是一种高压缩比和解压速度极快的编码, 特点是
解压缩速度非常快。
LZO是无损压缩,压缩后的数据能准确还原
lzo是基于block分块的,允许数据被分解成chunk,能够被并行的解压
下面说一下如何,部署编译 hadoop protocolBuffer 和 lzo , 我下面提到的hadoop是基于 0.19.3版本的,需要很多额外的修改源码的工作。 如果你使用的是 hadoop-0.20+ , 就省了这些麻烦的事情了, 可以不用修改代码 直接编译。
系统环境
Linux 2.6.9-78 64位系统
安装编译
安装 lzo
下载 lzo-2.03 http://www.oberhumer.com/opensource/lzo/download/lzo-2.03.tar.gz
解压
1
2
3
4
|
cd lzo-2.03/ . /configure – enable -shared make sudo make
|
安装成功
1
2
|
vi /etc/ld .so.conf.d /local .conf #输入内容 sudo /sbin/ldconfig |
部署 ant 和 jdk6 jdk5 和 forrest
安装apache-ant-1.7.1
解压
1
2
|
export ANT_HOME= /home/admin/yewang/apache-ant-1 .7.1 export PATH=$PATH: /home/admin/yewang/apache-ant-1 .7.1 /bin |
安装jdk 6 解压到 /home/admin/yewang/jdk1.6.0_13
1
2
|
export JAVA_HOME= /home/admin/yewang/jdk1 .6.0_13 export JDK_HOME= /home/admin/yewang/jdk1 .6.0_13 |
安装jdk 5
1
2
3
4
5
6
7
8
9
10
|
wget //cds-esd .sun.com /ESD6/JSCDL/jdk/1 .5.0_22 /jdk-1_5_0_22-linux-amd64 .bin [shell] 解压到 /home/admin/yewang/jdk1 .5.0_22/ 安装 下载 //apache .etoak.com /forrest/apache-forrest-0 .8. tar .gz 解压到 /home/admin/yewang/apache-forrest-0 .8 <strong>编译安装 /strong > [shell]wget //www .poolsaboveground.com /apache/hadoop/core/stable/hadoop-0 .19.2. tar .gz |
解压
1
|
cd /home/admin/yewang/hadoop-0 .19.2-dev |
修改源码:
1
2
|
vi 修改为: |
1
2
|
vi 修改为: |
1
2
3
4
5
6
|
vi 修改为: public protected protected protected |
1
2
3
4
5
6
7
8
|
vi 修改为: public protected protected protected protected public |
开始编译
1
|
ant |
整个需要部署的内容已经打包在 build/hadoop-0.19.2-dev.tar.gz 包中
scp hadoop-0.19.2-dev.tar.gz 到集群机器上, 解压到 /home/admin/hadoop_sta/hadoop/ 目录下
修改配置
vi conf/hadoop-site.xml
添加
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
<property> <name>mapred.child.env</name> <value>JAVA_LIBRARY_PATH=/home/admin/hadoop_sta/hadoop/lib/native/Linux-amd64-64</value> </property> <property> <name>io.compression.codec.lzo.class</name> <value>com.hadoop.compression.lzo.LzoCodec</value> </property> <property> <name>mapred.compress.map.output</name> <value>true</value> </property> <property> <name>mapred.map.output.compression.codec</name> <value>com.hadoop.compression.lzo.LzoCodec</value> </property> |
1
2
3
4
5
6
|
vi 修改 <property> <name>io.compression.codecs</name> <value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec,com.hadoop.compression.lzo.LzopCodec</value> </property> |
编译安装 hadoop lzo 本地库及jar
下载 http://download.github.com/kevinweil-hadoop-lzo-3e7c9dc.tar.gz
解压
1
2
3
|
export export ant |
需要 部署的内容在 build/hadoop-lzo-0.4.4.tar.gz 中
copy到集群机器解压
将lib/native/Linux-amd64-64/下所有文件 复制到 /home/admin/hadoop_sta/hadoop/lib/native/Linux-amd64-64 目录下。下载的版本中的jar包 是基于hadoop 0.20+开发的。如果你选用的是 hadoop 0.20+ 的版本 可以直接用 ,名字是 hadoop-lzo-0.44.jar。这个jar包 需要copy 到 集群的每台机器的 hadoop/lib 目录下。如果是 hadoop-0.19 请下载我提供的hadoop-lzo版本
,解压后进行ant 编译即可, 链接是:
https://docs.google.com/leaf id=0B_QnJPSut6_SODAzZmU4YWMtYmQ4Ni00MTNmLTgzMTUtZTg3ZjQzNTgxMzU1&hl=zh_CN (需要翻墙)
安装protocolBuffer
官方网站 http://code.google.com/intl/zh-CN/apis/protocolbuffers/docs/overview.html
下载
http://code.google.com/p/protobuf/downloads/detail
name=protobuf-2.3.0.tar.gz&can=2&q=
解压后 请 参考目录下的readme进行安装
官网里面关于如何使用,讲的很清楚 大家仔细看吧
编译c++版本 , 在hadoop集群的每一个节点上都需要安装, 并且其so的路径 需要加到 /etc/ld.so.conf.d/的配置中
编译java版本, */java/目录下 ,进入参考readme, 编译 ,会生成一个 protobuf-java-2.3.0.jar 的包 , 需要copy到每个 hadoop节点的 hadoop/lib 目录下
编译 python版本,*/python/目录下 ,进入参考readme, 编译 , 在hadoop集群的每一个节点上都需要安装, python版本要2.4以上
安装elephant-bird
下载 http://github.com/kevinweil/elephant-bird
解压
ant
编译生成的jar包 elephant-bird-1.0.jar, 需要copy到集群所有机器的hadoop lib 目录下
1
|
cp dist /elephant-bird-1 .0.jar /home/admin/hadoop_sta/hadoop/lib/ |
下面讲一下如何使用elephant-bird的代码生成器生成自己的 hadoop+protocolBuffer+lzo的代码。
编写自己的 *.proto 文件
生成代码的命令:
1
2
3
4
5
6
|
protoc –proto_path=/home/admin/yewang/kevinweil-elephant-bird-3bdbafa/examples/src/proto –java_out=/home/admin/yewang/kevinweil-elephant-bird-3bdbafa/examples/src/gen-java –twadoop_out=/home/admin/yewang/kevinweil-elephant-bird-3bdbafa/examples/src/gen-java/ –proto_path=/home/admin/yewang/kevinweil-elephant-bird-3bdbafa/examples/src/proto /home/admin/yewang/kevinweil-elephant-bird-3bdbafa/examples/src/proto/address_book.proto |
–twadoop_out 指定了2个意义:
1. 生成的hadoopProto***.java 文件放置的路径
2. protoc 要去调用一个外部的脚本 , 名字为 protoc-gen-twadoop 。
注意是根据名字 protoc_gen_ 的前缀 加上 twadoop 组合成一个名字 去调用 protoc-gen-twadoop 脚本的
此处非常诡异,大家使用时一定要注意
protoc-gen-twadoop 的内容为 :
/home/admin/jdk1.6.0_13/bin/java
-cp /home/admin/yewang/kevinweil-elephant-bird-3bdbafa/lib/*:/home/admin/yewang/kevinweil-elephant-bird-3bdbafa/dist/elephant-bird-1.0.jar
com.twitter.elephantbird.proto.HadoopProtoCodeGenerator
config-twadoop.yml
-
注意此处的红色的-不是可有可无的!!!
下载的版本中的jar包 是基于hadoop 0.20+开发的。如果你选用的是 hadoop 0.20+ 的版本 可以直接用 ,名字是 elephant-bird-1.0.jar
如果是 hadoop-0.19 请下载我提供的hadoop-lzo版本 ,解压后进行ant编译即可, 链接是https://docs.google.com/leaf
id=0B_QnJPSut6_SMmJmYzEyMWYtMTJmYi00OGZkLWE5N2QtYzY3M2M3MTAzMjll&hl=zh_CN (需要翻墙)
elephant-bird 里面包含了如下的这些类, 我大概说明一下他们的用途
mapred/input/LzoProtobufBlockB64InputFormat.java 读取proto+lzo的数据文件, 做Base64转换成文本输出, 主要是为streaming提供的
mapred/input/LzoProtobufBlockInputFormat.java 读取proto+lzo的数据文件,解析成proto对象, 主要是mapreduce程序使用
mapred/input/ProtobufBlockInputFormat.java 读取proto 的数据文件,解析成proto对象, 主要是mapreduce程序使用
mapred/io/ProtobufBlockReader.java 真正执行读取的类
mapred/io/ProtobufBlockWriter.java 真正执行输出的类
mapred/io/ProtobufWritable.java
mapred/output/LzoProtobufBlockB64OutputFormat.java 获取base64转换后的文本 ,做Base64转换后解析成proto对象,存储成proto+lzo的数据文件,主要是为streaming提供的
mapred/output/LzoProtobufBlockMultiOutputFormat.java 获取proto对象,存储成proto+lzo的数据文件, 可以根据key的值,改变输出part的前缀,一次输出多种命名文件。
mapred/output/LzoProtobufBlockOutputFormat.java 获取proto对象,存储成proto+lzo的数据文件。 主要是mapreduce程序使用
mapred/output/ProtobufBlockOutputFormat.java 获取proto对象,存储成proto的数据文件。 主要是mapreduce程序使用
重启集群 全部配置完成
运行使用
压缩命令:
1
|
hadoop |
给lzo压缩文件加索引:
1
|
hadoop |
给lzo文件加索引的目的是为了让lzo支持 splitable, 这样hadoop可以并行处理, 所以这一步很关键, 生成的文件后缀.index
我们在 hadoop-lzo-0.4.4.jar 另一个mapreduce版本的 创建索引的工具 DistributedLzoIndexer
解压命令:
1
2
3
|
hadoop -inputformat -input |
如何编写 读取 写出 protocolBuffer + lzo 文件的mapreduce程序
编写.proto文件
具体语法请参考protocolBuffer网站
例子:
1
2
3
4
5
6
7
8
9
|
<package message { optional optional optional optional optional } |
使用protoc程序生成java代码
protoc –java_out=. auction.proto
生成的文件是 com/taobao/proto/Auction.java 文件
实现定制的inputFormat和outputFormat
主要是下面的3个类, 相关的代码 在 我提供elephant-bird下载包的 taobao 目录下都有
AuctionProtobufWritable.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
package com.taobao.proto.mapred.io; import com.taobao.proto.Auction.auction; import com.twitter.elephantbird.mapred.io.ProtobufWritable; import com.twitter.elephantbird.util.TypeRef; public class
extends ProtobufWritable<auction> public AuctionProtobufWritable() super ( new TypeRef<auction>(){}); } } |
AuctionLzoProtobufBlockInputFormat.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
AuctionLzoProtobufBlockInputFormat.java package com.taobao.proto.mapred.input; import com.taobao.proto.Auction.auction; import com.twitter.elephantbird.mapred.input.LzoProtobufBlockInputFormat; import com.taobao.proto.mapred.io.AuctionProtobufWritable; import com.twitter.elephantbird.util.TypeRef; public class
extends LzoProtobufBlockInputFormat<auction, le> { public AuctionLzoProtobufBlockInputFormat() { setTypeRef( new TypeRef<auction>(){}); setProtobufWritable( new AuctionProtobufWritable()); } } |
AuctionLzoProtobufBlockOutputFormat.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
package com.taobao.proto.mapred.output; import com.taobao.proto.Auction.auction; import com.twitter.elephantbird.mapred.output.LzoProtobufBlockOutputFormat; import com.taobao.proto.mapred.io.AuctionProtobufWritable; import com.twitter.elephantbird.util.TypeRef; public class
extends LzoProtobufBlockOutputFormat<auction, able> { public AuctionLzoProtobufBlockOutputFormat() { setTypeRef( new TypeRef<auction>(){}); } } |
编写mapreduce程序
job 的设置:
1
2
3
4
|
job.setOutputKeyClass(NullWritable. class ); job.setOutputValueClass(AuctionProtobufWritable. class ); job.setInputFormat(AuctionLzoProtobufBlockInputFormat. class ); job.setOutputFormat(AuctionLzoProtobufBlockOutputFormat. class ); |
mapper和reduce类:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
import com.google.protobuf.ByteString; import com.taobao.proto.Auction; import com.taobao.proto.Auction.auction; import com.taobao.proto.mapred.io.*; import com.taobao.proto.mapred.input.*; import com.taobao.proto.mapred.output.*; public static
extends MapReduceBase implements Mapper<LongWritable, able, { @Override public void
OutputCollector<NullWritable, throws IOException { auction auction.Builder if (pa.hasId()) if (pa.hasTitle()) if (pa.hasUser()) ...... AuctionProtobufWritable new AuctionProtobufWritable(); pw.set(builder.build()); outCollector.collect(NullWritable.get(), } } |
编译成jar包
ant 编译
如何运行
1
|
hadoop |
streaming调用方式,和map reduce处理程序 python样例
编写.proto文件
复用上面的 例子一样
使用protoc 生成 python代码
1
|
protoc |
生成的文件是 auction_pb2.py
编写map reduce 脚本
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
#!/home/admin/Python/bin/python # # # import auction_pb2 import sys import base64 pa = auction_pb2.auction() f = open ( "/dev/stdin" , "rb" ) while True : line = f.readline(); if len (line) = = 0 : break ; # line = line.strip(); # keyValue = line.split( "\t" ); # value = base64.standard_b64decode(keyValue[ 1 ]); # pa.ParseFromString(value); # # # print "1\t"
f.close() |
如何运行
1
|
hadoop |
测试结果
测试环境:
3 台虚拟机 (1G 内存 单核2G)
测试数据:
宝贝xml, 1.3 G
数据大小对比:
处理前: 1.3G 处理后: 285 M
数据减少为原始数据的 22%
处理速度对比
xml 读取加解析 1m26.595s
proto+lzo 读取加解析 0m21.833s
xml 读取加解析然后构造成xml输出 2m45.949s
proto+lzo 读取加解析然后构造成proto输出 0m40.267s
测试结论:
从测试的结果来看, 性能的提升比较明显。
原文地址:http://www.searchtb.com/2010/09/pb-lzo-used-in-hadoop.html