今天从网上看到点数据,很适合用MapReduce来分析一下。一条记录的格式如下:
[**] [1:538:15] NETBIOS SMB IPC$ unicode share access [**]
[Classification: Generic Protocol Command Decode] [Priority: 3]
09/04-17:53:56.363811 168.150.177.165:1051 -> 168.150.177.166:139
TCP TTL:128 TOS:0x0 ID:4000 IpLen:20 DgmLen:138 DF
***AP*** Seq: 0x2E589B8 Ack: 0x642D47F9 Win: 0x4241 TcpLen: 20
[**] [1:1917:6] SCAN UPnP service discover attempt [**]
[Classification: Detection of a Network Scan] [Priority: 3]
09/04-17:53:56.385573 168.150.177.164:1032 -> 239.255.255.250:1900
UDP TTL:1 TOS:0x0 ID:80 IpLen:20 DgmLen:161
Len: 133
大家可以看到要处理上面的记录,肯定不能用系统默认的TextInputFormat.class
所以要自己写一个读取类,从上面的格式可以看出。每一条记录由换行符分割,其余的行为一条记录(包括多行)。闲话少说,直接上代码:
private int pos;
private boolean more;
private LineReader in;
private int maxLineLength;
private IntWritable key = null;
private Text value1 = null;
private String value = "";
public void initialize(InputSplit genericSplit,
TaskAttemptContext context) throws IOException {
pos = 1;
more = true;
FileSplit split = (FileSplit) genericSplit;
Configuration job = context.getConfiguration();
this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",
Integer.MAX_VALUE);
final Path file = split.getPath();
// open the file and seek to the start of the split
FileSystem fs = file.getFileSystem(job);
FSDataInputStream fileIn = fs.open(split.getPath());
in = new LineReader(fileIn, job);
}
public boolean nextKeyValue() throws IOException {
if (key == null) {
key = new IntWritable();
}
key.set(pos);
if (value1 == null) {
value1 = new Text();
}
value = "";
int newSize = 0;
while (true) {
newSize = in.readLine(value1, maxLineLength,maxLineLength);
pos++;
if (newSize == 0) {
//value += value1.toString();
if (!value.isEmpty()){
newSize = 1;
}
break;
}
if (newSize == 1) {
//当newSize == 1是就是读取的换行符,所以要输出。
break;
}
if (newSize < maxLineLength) {
//如果大于1,就证明读取了一行,但这条记录并没有结束。
value += value1.toString();
//中间用空格分割一下
value += " ";
//break;
}
// line too long. try again
LOG.info("Skipped line of size " + newSize + " at pos " +
(pos - newSize));
}
if (newSize == 0) {
key = null;
//value = "";
more = false;
return false;
} else {
return true;
}
}
@Override
public IntWritable getCurrentKey() {
return key;
}
@Override
public Text getCurrentValue() {
return new Text(value);
}
/**
* Get the progress within the split
*/
public float getProgress() {
if (more) {
return 0.0f;
} else {
return 100;
}
}
public synchronized void close() throws IOException {
if (in != null) {
in.close();
}
}
}
通过上面的类,就可以将4行连接为一条记录。换行符作为一条记录的结束。