现在的位置: 首页 > 综合 > 正文

ProtocolBuffer和lzo技术Hadoop系统上的使用

2014年09月05日 ⁄ 综合 ⁄ 共 13212字 ⁄ 字号 评论关闭

概述

基于hadoop的集群分布式数据处理目前是淘宝搜索中心最重要的数据处理平台,在集群物理条件确定的情况下,有几个方面影响了数据处理的速度。

1、数据大小 (影响磁盘IO和网络IO)

2、数据格式 (影响数据的解析及构造速度)

3、并行度

使用 protocolBuffer + lzo技术,能帮我们做到数据小解析快并行度高这三点, 能帮我们大幅度提高处理的速度。下面详细介绍一下如何编译部署及开发相关代码。

hadoop介绍

请参考 分布式计算开源框架Hadoop介绍 和 官方网站 http://hadoop.apache.org

protocolBuffer介绍

官方网站http://code.google.com/intl/zh-CN/apis/protocolbuffers/docs/overview.html

Google定义的一套数据协议,用于数据的结构化和序列化。 Google绝大部分模块数据交互基于此数据协议。

1、平台无关、语言无关。

2、二进制、数据自描述。

3、提供了完整详细的操作API。

4、高性能 比xml要快20-100倍

5、尺寸小 比xml要小3-10倍 –高可扩展性

6、数据自描述、前后兼容

适用于

1、不同的平台、系统、语言、模块之间高效的数据交互

2、用于构建大型的复杂系统,降低数据层面的耦合度和复杂度

这里要特别着重说的是protocolBuffer是一种数据协议,就像tcp/ip协议一样,只要是遵守此协议的任何系统之间都能高效的进行数据交互。

第二个特别要说的是 数据自描述。 也就是说拿到任何一个protocolBuffer的数据文件,我们不需要任何其他的辅助信息,就能顺利的解析出其中的数据信息。

这2点是最本质的。

google同时提供了一套代码生成工具,能根据用户自定义的.proto文件,生成c++/java/python的 代码,用于调用protocolBuffer的内核API . 给我们使用提供了很大的便利

.proto文件 详细请参考 官方网站 http://code.google.com/intl/zh-CN/apis/protocolbuffers/docs/overview.html

lzo介绍

LZO是一种高压缩比和解压速度极快的编码, 特点是

解压缩速度非常快。

LZO是无损压缩,压缩后的数据能准确还原

lzo是基于block分块的,允许数据被分解成chunk,能够被并行的解压

下面说一下如何,部署编译 hadoop protocolBuffer 和 lzo , 我下面提到的hadoop是基于 0.19.3版本的,需要很多额外的修改源码的工作。 如果你使用的是 hadoop-0.20+ , 就省了这些麻烦的事情了, 可以不用修改代码 直接编译。

系统环境

Linux 2.6.9-78 64位系统

安装编译

安装 lzo

下载 lzo-2.03 http://www.oberhumer.com/opensource/lzo/download/lzo-2.03.tar.gz

解压

1
2
3
4
cdlzo-2.03/
./configureenable-shared
make
sudomake

install

安装成功

1
2
vi /etc/ld.so.conf.d/local.conf #输入内容
/usr/local/lib
sudo/sbin/ldconfig

部署 ant 和 jdk6 jdk5 和 forrest

安装apache-ant-1.7.1

解压

1
2
exportANT_HOME=/home/admin/yewang/apache-ant-1.7.1
export PATH=$PATH:/home/admin/yewang/apache-ant-1.7.1/bin

安装jdk 6 解压到 /home/admin/yewang/jdk1.6.0_13

1
2
exportJAVA_HOME=/home/admin/yewang/jdk1.6.0_13
exportJDK_HOME=/home/admin/yewang/jdk1.6.0_13

安装jdk 5

1
2
3
4
5
6
7
8
9
10
wget
http:
//cds-esd.sun.com/ESD6/JSCDL/jdk/1.5.0_22/jdk-1_5_0_22-linux-amd64.bin
[shell]
解压到/home/admin/yewang/jdk1.5.0_22/
 
安装
forrest
下载
http:
//apache.etoak.com/forrest/apache-forrest-0.8.tar.gz
解压到/home/admin/yewang/apache-forrest-0.8
 
<strong>编译安装
支持本地库的 hadoop-0.19<
/strong>
[shell]wget 
wget http:
//www.poolsaboveground.com/apache/hadoop/core/stable/hadoop-0.19.2.tar.gz

解压

1
cd/home/admin/yewang/hadoop-0.19.2-dev

修改源码:

1
2
vi
src/core/org/apache/hadoop/io/compress/BlockCompressorStream.java
修改为: 
public class BlockCompressorStream extends CompressorStream
1
2
vi  
src/core/org/apache/hadoop/io/compress/BlockDecompressorStream.java
修改为: 
public class BlockDecompressorStream extends DecompressorStream
1
2
3
4
5
6
vi  
src/core/org/apache/hadoop/io/compress/CompressorStream.java
修改为:
public
class CompressorStream extends CompressionOutputStream {
protected
Compressor compressor;
protected
byte[] buffer;
protected
boolean closed = false;
1
2
3
4
5
6
7
8
vi  
src/core/org/apache/hadoop/io/compress/DecompressorStream.java
修改为:
public
class DecompressorStream extends CompressionInputStream {
protected
Decompressor decompressor = null;
protected
byte[] buffer;
protected
boolean eof = false;
protected
boolean closed = false;
public
void checkStream() throws IOException {

开始编译

1
ant
-Dcompile.native=true tar  -Djava5.home=/home/admin/yewang/jdk1.5.0_22/ -Dforrest.home=/home/admin/yewang/apache-forrest-0.8/

整个需要部署的内容已经打包在 build/hadoop-0.19.2-dev.tar.gz 包中

scp hadoop-0.19.2-dev.tar.gz 到集群机器上, 解压到 /home/admin/hadoop_sta/hadoop/ 目录下
修改配置

vi conf/hadoop-site.xml

添加

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<property>
<name>mapred.child.env</name>
<value>JAVA_LIBRARY_PATH=/home/admin/hadoop_sta/hadoop/lib/native/Linux-amd64-64</value>
</property>
<property>
<name>io.compression.codec.lzo.class</name>
<value>com.hadoop.compression.lzo.LzoCodec</value>
</property>
 
<property>
<name>mapred.compress.map.output</name>
<value>true</value>
</property>
<property>
<name>mapred.map.output.compression.codec</name>
<value>com.hadoop.compression.lzo.LzoCodec</value>
</property>
1
2
3
4
5
6
vi
conf/hadoop-default.xml
修改
<property>
<name>io.compression.codecs</name>
<value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec,com.hadoop.compression.lzo.LzopCodec</value>
</property>

编译安装 hadoop lzo 本地库及jar

下载 http://download.github.com/kevinweil-hadoop-lzo-3e7c9dc.tar.gz

解压

1
2
3
export
CFLAGS=-m64
export
CXXFLAGS=-m64
ant
compile-native tar

需要 部署的内容在 build/hadoop-lzo-0.4.4.tar.gz 中

copy到集群机器解压

将lib/native/Linux-amd64-64/下所有文件 复制到 /home/admin/hadoop_sta/hadoop/lib/native/Linux-amd64-64 目录下。下载的版本中的jar包 是基于hadoop 0.20+开发的。如果你选用的是 hadoop 0.20+ 的版本 可以直接用 ,名字是 hadoop-lzo-0.44.jar。这个jar包 需要copy 到 集群的每台机器的 hadoop/lib 目录下。如果是 hadoop-0.19 请下载我提供的hadoop-lzo版本
,解压后进行ant 编译即可, 链接是:
https://docs.google.com/leaf id=0B_QnJPSut6_SODAzZmU4YWMtYmQ4Ni00MTNmLTgzMTUtZTg3ZjQzNTgxMzU1&hl=zh_CN (需要翻墙)

安装protocolBuffer

官方网站 http://code.google.com/intl/zh-CN/apis/protocolbuffers/docs/overview.html

下载
http://code.google.com/p/protobuf/downloads/detail
name=protobuf-2.3.0.tar.gz&can=2&q=


解压后 请 参考目录下的readme进行安装

官网里面关于如何使用,讲的很清楚 大家仔细看吧

编译c++版本 , 在hadoop集群的每一个节点上都需要安装, 并且其so的路径 需要加到 /etc/ld.so.conf.d/的配置中

编译java版本, */java/目录下 ,进入参考readme, 编译 ,会生成一个 protobuf-java-2.3.0.jar 的包 , 需要copy到每个 hadoop节点的 hadoop/lib 目录下

编译 python版本,*/python/目录下 ,进入参考readme, 编译 , 在hadoop集群的每一个节点上都需要安装, python版本要2.4以上

安装elephant-bird

下载 http://github.com/kevinweil/elephant-bird

解压

ant

编译生成的jar包 elephant-bird-1.0.jar, 需要copy到集群所有机器的hadoop lib 目录下

1
cpdist/elephant-bird-1.0.jar/home/admin/hadoop_sta/hadoop/lib/

下面讲一下如何使用elephant-bird的代码生成器生成自己的 hadoop+protocolBuffer+lzo的代码。

编写自己的 *.proto 文件

生成代码的命令:

1
2
3
4
5
6
protoc
–proto_path=/home/admin/yewang/kevinweil-elephant-bird-3bdbafa/examples/src/proto
–java_out=/home/admin/yewang/kevinweil-elephant-bird-3bdbafa/examples/src/gen-java
–twadoop_out=/home/admin/yewang/kevinweil-elephant-bird-3bdbafa/examples/src/gen-java/
–proto_path=/home/admin/yewang/kevinweil-elephant-bird-3bdbafa/examples/src/proto
/home/admin/yewang/kevinweil-elephant-bird-3bdbafa/examples/src/proto/address_book.proto

–twadoop_out 指定了2个意义:

1. 生成的hadoopProto***.java 文件放置的路径

2. protoc 要去调用一个外部的脚本 , 名字为 protoc-gen-twadoop 。

注意是根据名字 protoc_gen_ 的前缀 加上 twadoop 组合成一个名字 去调用 protoc-gen-twadoop 脚本的

此处非常诡异,大家使用时一定要注意

protoc-gen-twadoop 的内容为 :

/home/admin/jdk1.6.0_13/bin/java

-cp /home/admin/yewang/kevinweil-elephant-bird-3bdbafa/lib/*:/home/admin/yewang/kevinweil-elephant-bird-3bdbafa/dist/elephant-bird-1.0.jar

com.twitter.elephantbird.proto.HadoopProtoCodeGenerator

config-twadoop.yml
-

注意此处的红色的-不是可有可无的!!!

下载的版本中的jar包 是基于hadoop 0.20+开发的。如果你选用的是 hadoop 0.20+ 的版本 可以直接用 ,名字是 elephant-bird-1.0.jar

如果是 hadoop-0.19 请下载我提供的hadoop-lzo版本 ,解压后进行ant编译即可, 链接是https://docs.google.com/leaf
id=0B_QnJPSut6_SMmJmYzEyMWYtMTJmYi00OGZkLWE5N2QtYzY3M2M3MTAzMjll&hl=zh_CN
 (需要翻墙)

elephant-bird 里面包含了如下的这些类, 我大概说明一下他们的用途

mapred/input/LzoProtobufBlockB64InputFormat.java 读取proto+lzo的数据文件, 做Base64转换成文本输出, 主要是为streaming提供的

mapred/input/LzoProtobufBlockInputFormat.java 读取proto+lzo的数据文件,解析成proto对象, 主要是mapreduce程序使用

mapred/input/ProtobufBlockInputFormat.java 读取proto 的数据文件,解析成proto对象, 主要是mapreduce程序使用

mapred/io/ProtobufBlockReader.java 真正执行读取的类

mapred/io/ProtobufBlockWriter.java 真正执行输出的类

mapred/io/ProtobufWritable.java

mapred/output/LzoProtobufBlockB64OutputFormat.java 获取base64转换后的文本 ,做Base64转换后解析成proto对象,存储成proto+lzo的数据文件,主要是为streaming提供的

mapred/output/LzoProtobufBlockMultiOutputFormat.java 获取proto对象,存储成proto+lzo的数据文件, 可以根据key的值,改变输出part的前缀,一次输出多种命名文件。

mapred/output/LzoProtobufBlockOutputFormat.java 获取proto对象,存储成proto+lzo的数据文件。 主要是mapreduce程序使用

mapred/output/ProtobufBlockOutputFormat.java 获取proto对象,存储成proto的数据文件。 主要是mapreduce程序使用

重启集群 全部配置完成

运行使用

压缩命令:

1
hadoop
jar ~/hadoop_sta/hadoop/contrib/streaming/hadoop-0.19.2-dev-streaming.jar -input /app/aa.txt -output /test-lzo -mapper cat -reducer cat -jobconf mapred.output.compress=true -jobconf mapred.output.compression.codec=org.apache.hadoop.io.compress.LzoCodec

给lzo压缩文件加索引:

1
hadoop
jar /home/admin/hadoop_sta/hadoop/lib/hadoop-lzo-0.4.4.jar com.hadoop.compression.lzo.LzoIndexer /test-lzo/

给lzo文件加索引的目的是为了让lzo支持 splitable, 这样hadoop可以并行处理, 所以这一步很关键, 生成的文件后缀.index
我们在 hadoop-lzo-0.4.4.jar 另一个mapreduce版本的 创建索引的工具 DistributedLzoIndexer

解压命令:

1
2
3
hadoop
jar ~/hadoop_sta/hadoop/contrib/streaming/hadoop-0.19.2-dev-streaming.jar
-inputformat
org.apache.hadoop.mapred.LzoTextInputFormat
-input
/test-lzo -output /test-txt -mapper cat -reducer cat

如何编写 读取 写出 protocolBuffer + lzo 文件的mapreduce程序

编写.proto文件

具体语法请参考protocolBuffer网站

例子:

1
2
3
4
5
6
7
8
9
<package
com.taobao.proto;
message
auction
{
  optional
string id = 1;
  optional
bytes  title = 2;
  optional
string user = 3;
  optional
string pict_url = 4;
  optional
uint32 category = 5;
}

使用protoc程序生成java代码

protoc –java_out=. auction.proto

生成的文件是 com/taobao/proto/Auction.java 文件

实现定制的inputFormat和outputFormat

主要是下面的3个类, 相关的代码 在 我提供elephant-bird下载包的 taobao 目录下都有

AuctionProtobufWritable.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
packagecom.taobao.proto.mapred.io;
 
importcom.taobao.proto.Auction.auction;
importcom.twitter.elephantbird.mapred.io.ProtobufWritable;
importcom.twitter.elephantbird.util.TypeRef;
 
publicclass

AuctionProtobufWritable
extendsProtobufWritable<auction>
{
 
        publicAuctionProtobufWritable()
{
 
                super(newTypeRef<auction>(){});
 
        }
 
}

AuctionLzoProtobufBlockInputFormat.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
AuctionLzoProtobufBlockInputFormat.java
 
packagecom.taobao.proto.mapred.input;
 
importcom.taobao.proto.Auction.auction;
importcom.twitter.elephantbird.mapred.input.LzoProtobufBlockInputFormat;
importcom.taobao.proto.mapred.io.AuctionProtobufWritable;
importcom.twitter.elephantbird.util.TypeRef;
 
publicclass

AuctionLzoProtobufBlockInputFormat
extendsLzoProtobufBlockInputFormat<auction,
AuctionProtobufWritab
le>
 
{
        publicAuctionLzoProtobufBlockInputFormat()
        {
 
                setTypeRef(newTypeRef<auction>(){});
 
                setProtobufWritable(newAuctionProtobufWritable());
        }
}

AuctionLzoProtobufBlockOutputFormat.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
packagecom.taobao.proto.mapred.output;
 
importcom.taobao.proto.Auction.auction;
importcom.twitter.elephantbird.mapred.output.LzoProtobufBlockOutputFormat;
importcom.taobao.proto.mapred.io.AuctionProtobufWritable;
importcom.twitter.elephantbird.util.TypeRef;
 
publicclass

AuctionLzoProtobufBlockOutputFormat
extendsLzoProtobufBlockOutputFormat<auction,
AuctionProtobufWrit
able>
 
{
        publicAuctionLzoProtobufBlockOutputFormat()
        {
 
                setTypeRef(newTypeRef<auction>(){});
        }
}

编写mapreduce程序

job 的设置:

1
2
3
4
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(AuctionProtobufWritable.class);
job.setInputFormat(AuctionLzoProtobufBlockInputFormat.class);
job.setOutputFormat(AuctionLzoProtobufBlockOutputFormat.class);

mapper和reduce类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
importcom.google.protobuf.ByteString;
importcom.taobao.proto.Auction;
importcom.taobao.proto.Auction.auction;
importcom.taobao.proto.mapred.io.*;
importcom.taobao.proto.mapred.input.*;
importcom.taobao.proto.mapred.output.*;
 
publicstatic

class

proto2protoMapper
extendsMapReduceBase
implements   Mapper<LongWritable,
AuctionProtobufWrit
able,
NullWritable, AuctionProtobufWritable>
{
 
        @Override
        publicvoid

map(LongWritable key, AuctionProtobufWritable value,
 
                        OutputCollector<NullWritable,
AuctionProtobufWritable> outCollector, Reporter reporter)
 
        throwsIOException
 
        {
 
                auction
pa = value.get();
 
                auction.Builder
builder = auction.newBuilder();
 
                if(pa.hasId())             
builder.setId(pa.getId());
 
                if(pa.hasTitle())          
builder.setTitle(pa.getTitle());
 
                if(pa.hasUser())           
builder.setUser(pa.getUser());
 
                ......
 
                AuctionProtobufWritable
pw =
newAuctionProtobufWritable();
 
                pw.set(builder.build());
 
                outCollector.collect(NullWritable.get(), 
pw);
 
        }
}

编译成jar包

ant 编译

如何运行

1
hadoop
jar dist/taobao-proto-auction-1.0.jar com.taobao.proto.proto2proto /yewang/xml2proto /yewang/proto2proto

streaming调用方式,和map reduce处理程序 python样例

编写.proto文件

复用上面的 例子一样

使用protoc 生成 python代码

1
protoc
–python_out=../python/ auction.proto

生成的文件是 auction_pb2.py

编写map reduce 脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#!/home/admin/Python/bin/python
#
-*- coding: utf-8 -*-
#
Filename: reducer.py
#
Author: yewang@taobao.com clm971910@gmail.com
importauction_pb2
importsys
importbase64
 
pa=auction_pb2.auction()
f=open("/dev/stdin","rb")
 
whileTrue:
        line=f.readline();
 
        iflen(line)==0:
                break;
 
        #
处理掉换行符 , (streaming.jar 帮我们加的)
        line=line.strip();
 
        #
切分出keyValue, (streaming.jar 帮我们加的)
        keyValue=line.split("\t");
 
        #
base64 解码
        value=base64.standard_b64decode(keyValue[1]);
 
        #
解析 成 proto 对象
        pa.ParseFromString(value);
 
        #
输出部分内容, 需要带上key, \t分隔,用于选择合适的reducer
        #
print "1\t" + pa.title;
        #
如果想要输出proto , 需要将proto对象转换成字符串, 然后base64编码
        print"1\t"

+

base64.standard_b64encode(value);
 
f.close()

如何运行

1
hadoop
jar -libjars dist/taobao-proto-auction-1.0.jar /home/admin/hadoop_sta/hadoop/contrib/streaming/hadoop-0.19.2-dev-streaming.jar  -input /yewang/testproto -output /yewang/testStreaming4  -file /home/admin/yewang/elephant-bird/taobao/src/python/auction_pb2.py
-file /home/admin/yewang/elephant-bird/taobao/src/python/mapper.py -file  /home/admin/yewang/elephant-bird/taobao/src/python/reducer.py  -inputformat com.taobao.proto.mapred.input.AuctionLzoProtobufBlockB64InputFormat -outputformat com.taobao.proto.mapred.output.AuctionLzoProtobufBlockB64OutputFormat 
-reducer reducer.py -mapper mapper.py


测试结果

测试环境:

3 台虚拟机 (1G 内存 单核2G)

测试数据:

宝贝xml, 1.3 G

数据大小对比:

处理前: 1.3G 处理后: 285 M

数据减少为原始数据的 22%

处理速度对比

xml 读取加解析 1m26.595s

proto+lzo 读取加解析 0m21.833s

xml 读取加解析然后构造成xml输出 2m45.949s

proto+lzo 读取加解析然后构造成proto输出 0m40.267s

测试结论:

从测试的结果来看, 性能的提升比较明显。

原文地址:http://www.searchtb.com/2010/09/pb-lzo-used-in-hadoop.html

抱歉!评论已关闭.