有的時候,我們在 hadoop 上的輸入可能不是一些基於行的文本,是希望自定義一些結構化的數據。這種情況,一般會選用工具將結構化的數據序列化成位元組流,存儲在磁碟上。然後在 maper 中讀取進來,反序列化即可得到原來的數據。
我們使用 google protobuf 作為這種結構化的信息傳遞的工具。
首先可以先定義 person.proto 文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
option java_package = "com.hackerlight.proto";
option java_outer_classname = "ProtobufMessage";
message Person {
required string name = 1;
required int32 id = 2;
optional string email = 3;
enum PhoneType {
MOBILE = 0;
HOME = 1;
WORK = 2;
}
message PhoneNumber {
required string number = 1;
optional PhoneType type = 2 [default = HOME];
}
repeated PhoneNumber phone = 4;
message CountryInfo {
required string name = 1;
required string code = 2;
optional int32 number = 3;
}
}
|
生成 protobuf 的 java 代碼
1
|
protoc --java_out=./ test.proto
|
java 代碼會生成在 當前目錄下的 com/hackerlight/proto 下.
生成 SequenceFile 結構的代碼如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
|
package com.hackerlight.writer;
import java.io.FileOutputStream;
import java.net.URI;
import com.hackerlight.proto.ProtobufMessage;
import com.hackerlight.proto.ProtobufMessage.Person;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
public class Writer {
public static void main(String[] args) throws Exception {
String uri = args[0];
org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);
Path path = new Path(uri);
Person.Builder builder = Person.newBuilder();
builder.setEmail("moon@hackerlight.com");
builder.setId(1024);
builder.setName("Moon");
Person.PhoneNumber.Builder p = Person.PhoneNumber.newBuilder();
p.setNumber("18900000000");
builder.addPhone(p);
byte[] serialize_string = builder.build().toByteArray();
IntWritable key = new IntWritable();
BytesWritable value = new BytesWritable();
SequenceFile.Writer write = null;
try {
write = SequenceFile.createWriter(fs, conf, path, key.getClass(), value.getClass());
for (int i = 0; i < 100; ++i) {
key.set(i);
value.set(serialize_string, 0, serialize_string.length);
write.append(key, value);
}
} finally {
IOUtils.closeStream(write);
}
}
}
|
可以寫一個讀取的代碼反序列化寫入的結構數據
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
package com.hackerlight.reader;
import java.io.FileOutputStream;
import java.net.URI;
import com.hackerlight.proto.ProtobufMessage;
import com.hackerlight.proto.ProtobufMessage.Person;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.Writer;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;
public class Reader {
public static void main(String[] args) throws Exception {
String uri = args[0];
org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);
Path path = new Path(uri);
SequenceFile.Reader reader = null;
try {
reader = new SequenceFile.Reader(fs, path, conf);
Writable key = (Writable)ReflectionUtils.newInstance(reader.getKeyClass(), conf);
BytesWritable value = (BytesWritable)ReflectionUtils.newInstance(reader.getValueClass(), conf);
while (reader.next(key, value)) {
System.out.println("key : " + key);
Person new_person = Person.PARSER.parseFrom(value.getBytes(), 0, value.getLength());
System.out.println("name : " + new_person.getName());
System.out.println("email : " + new_person.getEmail());
}
} finally {
IOUtils.closeStream(reader);
}
}
}
|
在 Pipes 上可以這樣讀取
1
2
3
4
5
6
7
8
9
10
|
class MyMap: public HadoopPipes::Mapper {
public:
WordCountMap(HadoopPipes::TaskContext& context) {}
void map(HadoopPipes::MapContext& context) {
const std::string& key = context.getInputKey();
const std::string& value = context.getInputValue();
Person person;
assert(person.ParseFromString(value));
}
};
|
執行 hadoop 命令的時候一定要記得加上參數 -inputformat org.apache.hadoop.mapred.SequenceFileInputFormat