1.概述
小文件是指文件size小于HDFS上block大小的文件。这样的文件会给hadoop的扩展性和性能带来严重问题。首先,在HDFS中,任何block,文件或者目录在内存中均以对象的形式存储,每个对象约占150byte,如果有1千万个小文件,每个文件占用一个block,则NameNode大约需要2G空间。如果存储一亿个文件,则NameNode需要20G空间。这样NameNode内存容量严重制约了集群的扩展。其次,访问大量小文件速度远远小于访问几个大文件。HDFS最初是为流式访问大文件开发的,如果访问大量小文件,需要不断的从一个DataNode跳到另外一个DataNode,严重影响性能。最后,处理大量小文件速度远远小于处理同等大小的大文件的速度。每一个小文件要占用一个slot,而task启动将耗费大量时间甚至大部分时间都耗费在启动task和释放task上。
2.HDFS文件读写流程
在正式介绍HDFS小文件存储方案之前,我们先介绍一下当前HDFS上文件存取的基本流程。
2.1 读文件流程
A.client端发送读文件请求给NameNode,如果文件不存在,返回错误信息,否则,将该文件对应的block机器所在DataNode位置发送给client。
B.client收到文件位置信息后,与不同DataNode建立socket连接并行获取数据。
2.2 写文件流程
A.client端发送写文件请求,NameNode检查文件是否存在,如果已经存在,直接返回错误信息,否则,发送给client一些可用节点。
B.client将文件分块,并行存储到不同DataNode节点上,发送完成以后,client同时发送信息给NameNode和DataNode。
C.NameNode收到client的信息后,发送信息给DataNode。
D.DataNode同时收到NameNode和DataNode的确认信息后,提交写操作。
3 解决小文件的方案
3.1 编写应用程序实现
public class AppForSmallFile { //定义文件读取的路径 private static final String OUTPATH = "hdfs://liaozhongmin:9000"; public static void main(String[] args) { //定义FSDataOutputStream对象 FSDataOutputStream fsDataoutputStream = null; //定义输入流读文件 InputStreamReader inputStreamReader = null; try { //创建合并后文件存储的的路径 Path path = new Path(OUTPATH + "/combinedFile"); //创建FSDataOutputStream对象 fsDataoutputStream = FileSystem.get(path.toUri(), new Configuration()).create(path); //创建要合并的小文件路径 File sourceDir = new File("C:\\Windows\\System32\\drivers\\etc"); //遍历小文件 for (File fileName : sourceDir.listFiles()){ //创建输入流 //fileInputStream = new FileInputStream(fileName.getAbsolutePath()); //只有这样才可以制定字符编码(没办法,Window是默认GBK的,Hadoop是默认UTF-8的,所以读的时候就会乱码) inputStreamReader = new InputStreamReader(new FileInputStream(fileName), "gbk"); //一行一行的读取 List<String> readLines = IOUtils.readLines(inputStreamReader); //然后再写出去 for (String line : readLines){ //写入一行 fsDataoutputStream.write(line.getBytes()); //写入一个换行符 fsDataoutputStream.write("\n".getBytes()); } } System.out.println("合并成功"); } catch (Exception e) { e.printStackTrace(); } finally{ try { inputStreamReader.close(); fsDataoutputStream.close(); } catch (IOException e) { e.printStackTrace(); } } } }
注:这种方案是使用java文件相关操作,将众多的小文件写到一个文件中。
3.2 使用archive工具
创建文件 hadoop archive -archiveName xxx.har -p /src /dest 查看内部结构 hadoop fs -lsr /dest/xxx.har 查看内容 hadoop fs -lsr har:///dest/xxx.har
3.3 使用SequenceFile或者MapFile(以SequenceFile为例)
提供两种将小文件打成SequenceFile的方法:
方法一:
public class WriteSequenceMapReduce { // 定义输入路径 private static final String INPUT_PATH = "hdfs://master:9000/files"; // 定义输出路径 private static final String OUT_PATH = "hdfs://master:9000/seq/"; //定义文件系统 private static FileSystem fileSystem = null; public static void main(String[] args) { try { // 创建配置信息 Configuration conf = new Configuration(); // 创建文件系统 fileSystem = FileSystem.get(new URI(OUT_PATH), conf); // 如果输出目录存在,我们就删除 if (fileSystem.exists(new Path(OUT_PATH))) { fileSystem.delete(new Path(OUT_PATH), true); } // 创建任务 Job job = new Job(conf, WriteSequenceMapReduce.class.getName()); // 1.1 设置输入目录和设置输入数据格式化的类 FileInputFormat.setInputPaths(job, INPUT_PATH); // 1.2 设置自定义Mapper类和设置map函数输出数据的key和value的类型 job.setMapperClass(WriteSequenceMapper.class); // 2.3 指定输出的路径和设置输出的格式化类 FileOutputFormat.setOutputPath(job, new Path(OUT_PATH)); // 提交作业 退出 System.exit(job.waitForCompletion(true) ? 0 : 1); } catch (Exception e) { e.printStackTrace(); } } public static class WriteSequenceMapper extends Mapper<LongWritable, Text, Text, BytesWritable> { // 定义SequenceFile.Reader对象用于读文件 private static SequenceFile.Writer writer = null; // 定义配置信息 private static Configuration conf = null; // 定义最终输出的key和value private Text outkey = new Text(); private BytesWritable outValue = new BytesWritable(); //定义要合并的文件(存放在数组中) private FileStatus[] files = null; //定义输入流和一个字节数组 private InputStream inputStream = null; private byte[] buffer = null; @Override protected void setup(Mapper<LongWritable, Text, Text, BytesWritable>.Context context) throws IOException, InterruptedException { try { // 创建配置信息 conf = new Configuration(); // 创建Path对象 Path path = new Path(INPUT_PATH); // 创建SequenceFile.Writer对象,并指定压缩格式 writer = SequenceFile.createWriter(fileSystem,conf, new Path(OUT_PATH+"/total.seq"), Text.class, BytesWritable.class, CompressionType.BLOCK, new BZip2Codec()); //writer = SequenceFile.createWriter(fileSystem,conf, new Path(OUT_PATH+"/total.seq"), Text.class, BytesWritable.class); //获取要合并的文件数组 files = fileSystem.listStatus(path); } catch (Exception e) { e.printStackTrace(); } } @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, BytesWritable>.Context context) throws IOException, InterruptedException { //遍历文件数组 for (int i=0; i<files.length; i++){ //将文件名作为输出的key outkey.set(files[i].getPath().toString()); //创建输入流 inputStream = fileSystem.open(files[i].getPath()); //创建字节数组 buffer = new byte[(int) files[i].getLen()]; //通过工具类将文件读到字节数组中 IOUtils.readFully(inputStream, buffer, 0, buffer.length); //将字节数组中的内容及单个文件的内容作为value输出 outValue.set(new BytesWritable(buffer)); //关闭输入流 IOUtils.closeStream(inputStream); //将结果写到Sequencefile中 writer.append(outkey, outValue); } //关闭流 IOUtils.closeStream(writer); //System.exit(0); } } }
方法二:自定义InputFormat和RecordReader实现
public class WholeFileInputFormat extends FileInputFormat<NullWritable, BytesWritable>{ @Override public RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { //创建自定义的RecordReader WholeFileRecordReader reader = new WholeFileRecordReader(); reader.initialize(split, context); return reader; } @Override protected boolean isSplitable(JobContext context, Path filename) { return false; } }
public class WholeFileRecordReader extends RecordReader<NullWritable, BytesWritable>{ private FileSplit fileSplit; private Configuration conf; private BytesWritable value = new BytesWritable(); private boolean processed = false; public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException{ this.fileSplit = (FileSplit) split; this.conf = context.getConfiguration(); } /** * process表示记录是否已经被处理过了 */ @Override public boolean nextKeyValue() throws IOException, InterruptedException { if (!processed){ byte[] contents = new byte[(int) fileSplit.getLength()]; //获取路径 Path file = fileSplit.getPath(); //创建文件系统 FileSystem fileSystem = file.getFileSystem(conf); FSDataInputStream in = null; try { //打开文件 in = fileSystem.open(file); //将file文件中的内容放入contents数组中。使用了IOUtils工具类的readFully()方法,将in流中的内容读到contents字节数组中 IOUtils.readFully(in, contents, 0, contents.length); //BytesWritable是一个可用做key或value的字节序列,而ByteWritable是单个字节 //将value的内容设置为contents的值 value.set(contents, 0, contents.length); } catch (Exception e) { e.printStackTrace(); } finally{ IOUtils.closeStream(in); } processed = true; return true; } return false; } @Override public NullWritable getCurrentKey() throws IOException, InterruptedException { return NullWritable.get(); } @Override public BytesWritable getCurrentValue() throws IOException, InterruptedException { return value; } @Override public float getProgress() throws IOException, InterruptedException { return processed ? 1.0f : 0.0f; } @Override public void close() throws IOException { //do nothing } }
public class SmallFilesToSequenceFileConverter { // 定义输入路径 private static final String INPUT_PATH = "hdfs://master:9000/files/*"; // 定义输出路径 private static final String OUT_PATH = "hdfs://<span style="font-family: Arial, Helvetica, sans-serif;">master</span>:9000/seq/total.seq"; public static void main(String[] args) { try { // 创建配置信息 Configuration conf = new Configuration(); // 创建文件系统 FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf); // 如果输出目录存在,我们就删除 if (fileSystem.exists(new Path(OUT_PATH))) { fileSystem.delete(new Path(OUT_PATH), true); } // 创建任务 Job job = new Job(conf, SmallFilesToSequenceFileConverter.class.getName()); //1.1 设置输入目录和设置输入数据格式化的类 FileInputFormat.addInputPaths(job, INPUT_PATH); job.setInputFormatClass(WholeFileInputFormat.class); //1.2 设置自定义Mapper类和设置map函数输出数据的key和value的类型 job.setMapperClass(SequenceFileMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(BytesWritable.class); //1.3 设置分区和reduce数量(reduce的数量,和分区的数量对应,因为分区为一个,所以reduce的数量也是一个) job.setPartitionerClass(HashPartitioner.class); //千万不要有这句话,否则单个小文件的内容会输出到单独的一个Sequencefile文件中(简直内伤) //job.setNumReduceTasks(0); FileOutputFormat.setOutputPath(job, new Path(OUT_PATH)); job.setOutputFormatClass(SequenceFileOutputFormat.class); // 此处的设置是最终输出的key/value,一定要注意! job.setOutputKeyClass(Text.class); job.setOutputValueClass(BytesWritable.class); // 提交作业 退出 System.exit(job.waitForCompletion(true) ? 0 : 1); } catch (Exception e) { e.printStackTrace(); } } public static class SequenceFileMapper extends Mapper<NullWritable, BytesWritable, Text, BytesWritable> { // 定义文件的名称作为key private Text fileNameKey = null; /** * task调用之前,初始化fileNameKey */ @Override protected void setup(Mapper<NullWritable, BytesWritable, Text, BytesWritable>.Context context) throws IOException, InterruptedException { // 获取分片 InputSplit split = context.getInputSplit(); // 获取输入目录 Path path = ((FileSplit) split).getPath(); // 设置fileNameKey fileNameKey = new Text(path.toString()); } @Override protected void map(NullWritable key, BytesWritable value, Mapper<NullWritable, BytesWritable, Text, BytesWritable>.Context context) throws IOException, InterruptedException { // 将fileNameKey作为输出的key(文件名),value作为输出的value(单个小文件的内容) System.out.println(fileNameKey.toString()); context.write(fileNameKey, value); } } }
注:方法二的这三个类可以实现将小文件写到一个SequenceFile中。
读取SequenceFile文件:
public class ReadSequenceMapReduce { // 定义输入路径 private static final String INPUT_PATH = "hdfs://master:9000/seq/total.seq"; // 定义输出路径 private static final String OUT_PATH = "hdfs://<span style="font-family: Arial, Helvetica, sans-serif;">master</span>:9000/seq/out"; //定义文件系统 private static FileSystem fileSystem = null; public static void main(String[] args) { try { // 创建配置信息 Configuration conf = new Configuration(); // 创建文件系统 fileSystem = FileSystem.get(new URI(OUT_PATH), conf); // 如果输出目录存在,我们就删除 if (fileSystem.exists(new Path(OUT_PATH))) { fileSystem.delete(new Path(OUT_PATH), true); } // 创建任务 Job job = new Job(conf, ReadSequenceMapReduce.class.getName()); // 1.1 设置输入目录和设置输入数据格式化的类 FileInputFormat.setInputPaths(job, INPUT_PATH); // 这个很重要,指定使用SequenceFileInputFormat类来处理我们的输入文件 job.setInputFormatClass(SequenceFileInputFormat.class); // 1.2 设置自定义Mapper类和设置map函数输出数据的key和value的类型 job.setMapperClass(ReadSequenceMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); // 1.3 设置分区和reduce数量 job.setPartitionerClass(HashPartitioner.class); job.setNumReduceTasks(0); // 最终输出的类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); // 2.3 指定输出的路径和设置输出的格式化类 FileOutputFormat.setOutputPath(job, new Path(OUT_PATH)); job.setOutputFormatClass(TextOutputFormat.class); // 提交作业 退出 System.exit(job.waitForCompletion(true) ? 0 : 1); } catch (Exception e) { e.printStackTrace(); } } public static class ReadSequenceMapper extends Mapper<Text, BytesWritable, Text, Text> { //定义SequenceFile.Reader对象用于读文件 private static SequenceFile.Reader reader = null; //定义配置信息 private static Configuration conf = null; //定义最终输出的value private Text outValue = new Text(); /** * 在setUp()函数中初始化相关对象 */ @Override protected void setup(Mapper<Text, BytesWritable, Text, Text>.Context context) throws IOException, InterruptedException { try { // 创建配置信息 conf = new Configuration(); // 创建文件系统 //FileSystem fs = FileSystem.get(new URI(INPUT_PATH), conf); // 创建Path对象 Path path = new Path(INPUT_PATH); // 创建SequenceFile.Reader对象 reader = new SequenceFile.Reader(fileSystem, path, conf); } catch (Exception e) { e.printStackTrace(); } } @Override protected void map(Text key, BytesWritable value, Mapper<Text, BytesWritable, Text, Text>.Context context) throws IOException, InterruptedException { if (!"".equals(key.toString()) && !"".equals(value.get())){ //设置输出的value outValue.set(new String(value.getBytes(), 0, value.getLength())); //把结果写出去 context.write(key, outValue); } } } }