一、MR生成HFile文件
package insert.tools.hfile; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class TestHFileToHBase { public static class TestHFileToHBaseMapper extends Mapper { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] values = value.toString().split("/t", 2); byte[] row = Bytes.toBytes(values[0]); ImmutableBytesWritable k = new ImmutableBytesWritable(row); KeyValue kvProtocol = new KeyValue(row, "PROTOCOLID".getBytes(), "PROTOCOLID".getBytes(), values[1] .getBytes()); context.write(k, kvProtocol); // KeyValue kvSrcip = new KeyValue(row, "SRCIP".getBytes(), // "SRCIP".getBytes(), values[1].getBytes()); // context.write(k, kvSrcip); // HFileOutputFormat.getRecordWriter } } public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = HBaseConfiguration.create(); Job job = new Job(conf, "TestHFileToHBase"); job.setJarByClass(TestHFileToHBase.class); job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(KeyValue.class); job.setMapperClass(TestHFileToHBaseMapper.class); job.setReducerClass(KeyValueSortReducer.class); // job.setOutputFormatClass(org.apache.hadoop.hbase.mapreduce.HFileOutputFormat.class); job.setOutputFormatClass(HFileOutputFormat.class); // job.setNumReduceTasks(4); // job.setPartitionerClass(org.apache.hadoop.hbase.mapreduce.SimpleTotalOrderPartitioner.class); // HBaseAdmin admin = new HBaseAdmin(conf); // HTable table = new HTable(conf, "hua"); HFileOutputFormat.configureIncrementalLoad(job, table); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
二、改进后的HFileOutputFormat
源码中的HFileOutputFormat只适合一次生成一个列族的HFile,改进后的HFileOutputFormat适合同时多列族生成HFile文件。有add标签的是在源码上添加代码,有revise标签的是在源码上增加代码。参考:https://review.cloudera.org/r/1272/diff/1/?file=17977#file17977line93
/** * Copyright 2009 The Apache Software Foundation * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package insert.tools.hfile; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.TreeMap; import java.util.TreeSet; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.hfile.Compression; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer; import org.apache.hadoop.hbase.mapreduce.hadoopbackport.TotalOrderPartitioner; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import com.google.common.base.Preconditions; /** * Writes HFiles. Passed KeyValues must arrive in order. Currently, can only * write files to a single column family at a time. Multiple column families * requires coordinating keys cross family. Writes current time as the sequence * id for the file. Sets the major compacted attribute on created hfiles. * * @see KeyValueSortReducer */ public class HFileOutputFormat extends FileOutputFormat { static Log LOG = LogFactory.getLog(HFileOutputFormat.class); public RecordWriter getRecordWriter( final TaskAttemptContext context) throws IOException, InterruptedException { // Get the path of the temporary output file final Path outputPath = FileOutputFormat.getOutputPath(context); final Path outputdir = new FileOutputCommitter(outputPath, context) .getWorkPath(); Configuration conf = context.getConfiguration(); final FileSystem fs = outputdir.getFileSystem(conf); // These configs. are from hbase-*.xml // revise // final long maxsize = conf.getLong("hbase.hregion.max.filesize", // 268435456); // final int blocksize = conf.getInt("hfile.min.blocksize.size", 65536); final long maxsize = conf.getLong("hbase.hregion.max.filesize", HConstants.DEFAULT_MAX_FILE_SIZE); final int blocksize = conf.getInt("hfile.min.blocksize.size", HFile.DEFAULT_BLOCKSIZE); // -revise // Invented config. Add to hbase-*.xml if other than default // compression. final String compression = conf.get("hfile.compression", Compression.Algorithm.NONE.getName()); return new RecordWriter() { // Map of families to writers and how much has been output on the // writer. private final Map<byte[], WriterLength> writers = new TreeMap<byte[], WriterLength>( Bytes.BYTES_COMPARATOR); private byte[] previousRow = HConstants.EMPTY_BYTE_ARRAY; private final byte[] now = Bytes .toBytes(System.currentTimeMillis()); // add private boolean rollRequested = false; // -add public void write(ImmutableBytesWritable row, KeyValue kv) throws IOException { // add // null input == user explicitly wants to flush if (row == null && kv == null) { rollWriters(); return; } byte[] rowKey = kv.getRow(); // -add long length = kv.getLength(); byte[] family = kv.getFamily(); WriterLength wl = this.writers.get(family); // revise // if (wl == null // || ((length + wl.written) >= maxsize) // && Bytes.compareTo(this.previousRow, 0, // this.previousRow.length, kv.getBuffer(), kv // .getRowOffset(), kv.getRowLength()) != 0) { // // Get a new writer. // Path basedir = new Path(outputdir, Bytes.toString(family)); // if (wl == null) { // wl = new WriterLength(); // this.writers.put(family, wl); // if (this.writers.size() > 1) // throw new IOException("One family only"); // // If wl == null, first file in family. Ensure family // // dir exits. // if (!fs.exists(basedir)) // fs.mkdirs(basedir); // } // wl.writer = getNewWriter(wl.writer, basedir); // LOG // .info("Writer=" // + wl.writer.getPath() // + ((wl.written == 0) ? "" : ", wrote=" // + wl.written)); // wl.written = 0; // } // If this is a new column family, verify that the directory // exists if (wl == null) { fs.mkdirs(new Path(outputdir, Bytes.toString(family))); } // If any of the HFiles for the column families has reached // maxsize, we need to roll all the writers if (wl != null && wl.written + length >= maxsize) { this.rollRequested = true; } // This can only happen once a row is finished though if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) != 0) { rollWriters(); } // create a new HLog writer, if necessary if (wl == null || wl.writer == null) { wl = getNewWriter(family); } // we now have the proper HLog writer. full steam ahead // -revise kv.updateLatestStamp(this.now); wl.writer.append(kv); wl.written += length; // Copy the row so we know when a row transition. // revise // this.previousRow = kv.getRow(); this.previousRow = rowKey; // -revise } // revise // /* // * Create a new HFile.Writer. Close current if there is one. // * // * @param writer // * // * @param familydir // * // * @return A new HFile.Writer. // * // * @throws IOException // */ // private HFile.Writer getNewWriter(final HFile.Writer writer, // final Path familydir) throws IOException { // close(writer); // return new HFile.Writer(fs, StoreFile.getUniqueFile(fs, // familydir), blocksize, compression, // KeyValue.KEY_COMPARATOR); // } private void rollWriters() throws IOException { for (WriterLength wl : this.writers.values()) { if (wl.writer != null) { LOG.info("Writer=" + wl.writer.getPath() + ((wl.written == 0) ? "" : ", wrote=" + wl.written)); close(wl.writer); } wl.writer = null; wl.written = 0; } this.rollRequested = false; } /* * Create a new HFile.Writer. * * @param family * * @return A WriterLength, containing a new HFile.Writer. * * @throws IOException */ private WriterLength getNewWriter(byte[] family) throws IOException { WriterLength wl = new WriterLength(); Path familydir = new Path(outputdir, Bytes.toString(family)); wl.writer = new HFile.Writer(fs, StoreFile.getUniqueFile(fs, familydir), blocksize, compression, KeyValue.KEY_COMPARATOR); this.writers.put(family, wl); return wl; } // -revise private void close(final HFile.Writer w) throws IOException { if (w != null) { w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, Bytes .toBytes(System.currentTimeMillis())); w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY, Bytes .toBytes(context.getTaskAttemptID().toString())); w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY, Bytes .toBytes(true)); w.close(); } } // revise // public void close(TaskAttemptContext c) throws IOException, // InterruptedException { // for (Map.Entry e : this.writers // .entrySet()) { // close(e.getValue().writer); // } // } public void close(TaskAttemptContext c) throws IOException, InterruptedException { for (WriterLength wl : this.writers.values()) { close(wl.writer); } } // -revise }; } /* * Data structure to hold a Writer and amount of data written on it. */ static class WriterLength { long written = 0; HFile.Writer writer = null; } /** * Return the start keys of all of the regions in this table, as a list of * ImmutableBytesWritable. */ private static List getRegionStartKeys(HTable table) throws IOException { byte[][] byteKeys = table.getStartKeys(); ArrayList ret = new ArrayList( byteKeys.length); for (byte[] byteKey : byteKeys) { ret.add(new ImmutableBytesWritable(byteKey)); } return ret; } /** * Write out a SequenceFile that can be read by TotalOrderPartitioner that * contains the split points in startKeys. * * @param partitionsPath * output path for SequenceFile * @param startKeys * the region start keys */ private static void writePartitions(Configuration conf, Path partitionsPath, List startKeys) throws IOException { Preconditions.checkArgument(!startKeys.isEmpty(), "No regions passed"); // We're generating a list of split points, and we don't ever // have keys < the first region (which has an empty start key) // so we need to remove it. Otherwise we would end up with an // empty reducer with index 0 TreeSet sorted = new TreeSet( startKeys); ImmutableBytesWritable first = sorted.first(); Preconditions .checkArgument( first.equals(HConstants.EMPTY_BYTE_ARRAY), "First region of table should have empty start key. Instead has: %s", Bytes.toStringBinary(first.get())); sorted.remove(first); // Write the actual file FileSystem fs = partitionsPath.getFileSystem(conf); SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, partitionsPath, ImmutableBytesWritable.class, NullWritable.class); try { for (ImmutableBytesWritable startKey : sorted) { writer.append(startKey, NullWritable.get()); } } finally { writer.close(); } } /** * Configure a MapReduce Job to perform an incremental load into the given * table. This * * Inspects the table to configure a total order partitioner * Uploads the partitions file to the cluster and adds it to the * DistributedCache * Sets the number of reduce tasks to match the current number of * regions * Sets the output key/value class to match HFileOutputFormat's * requirements * Sets the reducer up to perform the appropriate sorting (either * KeyValueSortReducer or PutSortReducer) * * The user should be sure to set the map output value class to either * KeyValue or Put before running this function. */ public static void configureIncrementalLoad(Job job, HTable table) throws IOException { Configuration conf = job.getConfiguration(); job.setPartitionerClass(TotalOrderPartitioner.class); job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(KeyValue.class); job.setOutputFormatClass(HFileOutputFormat.class); // Based on the configured map output class, set the correct reducer to // properly // sort the incoming values. // TODO it would be nice to pick one or the other of these formats. if (KeyValue.class.equals(job.getMapOutputValueClass())) { job.setReducerClass(KeyValueSortReducer.class); } else if (Put.class.equals(job.getMapOutputValueClass())) { job.setReducerClass(PutSortReducer.class); } else { LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass()); } LOG.info("Looking up current regions for table " + table); List startKeys = getRegionStartKeys(table); LOG.info("Configuring " + startKeys.size() + " reduce partitions " + "to match current region count"); job.setNumReduceTasks(startKeys.size()); Path partitionsPath = new Path(job.getWorkingDirectory(), "partitions_" + System.currentTimeMillis()); LOG.info("Writing partition information to " + partitionsPath); FileSystem fs = partitionsPath.getFileSystem(conf); writePartitions(conf, partitionsPath, startKeys); partitionsPath.makeQualified(fs); URI cacheUri; try { cacheUri = new URI(partitionsPath.toString() + "#" + TotalOrderPartitioner.DEFAULT_PATH); } catch (URISyntaxException e) { throw new IOException(e); } DistributedCache.addCacheFile(cacheUri, conf); DistributedCache.createSymlink(conf); LOG.info("Incremental table output configured."); } }
三、MR生成HFile的注意事项
1. 无论是map还是reduce作为最终的输出结果,输出的key和value的类型应该是: 或者< ImmutableBytesWritable, Put>。
2. Map或者reduce的输出类型是KeyValue 或Put对应KeyValueSortReducer或PutSortReducer。
3. MR例子中job.setOutputFormatClass(HFileOutputFormat.class); HFileOutputFormat是改进后的mr,可适用于多列族同时生成HFile文件,源码中只适合一次对单列族组织成HFile文件。
4. MR例子中HFileOutputFormat.configureIncrementalLoad(job, table);自动对job进行配置,SimpleTotalOrderPartitioner是需要先对key进行整体排序,然后划分到每个reduce中,保证每一个reducer中的的key最小最大值区间范围,是不会有交集的。
因为入库到HBase的时候,作为一个整体的Region,key是绝对有序的。
5. MR例子中最后生成HFile存储在HDFS上,输出路径下的子目录是各个列族。如果对HFile进行入库HBase,相当于move HFile到HBase的Region中,HFile子目录的列族内容没有了。
四、HFile入库到HBase
import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; import org.apache.hadoop.hbase.util.Bytes; public class TestLoadIncrementalHFileToHBase { // private static final byte[] TABLE = Bytes.toBytes("hua"); // private static final byte[] QUALIFIER = Bytes.toBytes("PROTOCOLID"); // private static final byte[] FAMILY = Bytes.toBytes("PROTOCOLID"); public static void main(String[] args) throws IOException { Configuration conf = HBaseConfiguration.create(); // byte[] TABLE = Bytes.toBytes("hua"); byte[] TABLE = Bytes.toBytes(args[0]); HTable table = new HTable(TABLE); LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); loader.doBulkLoad(new Path(args[1]), table); // loader.doBulkLoad(new Path("/hua/testHFileResult/"), table); } }
五、HFile入库到HBase注意事项
1. 通过HBase中 LoadIncrementalHFiles的doBulkLoad方法,对生成的HFile文件入库,入库的第一个参数是表名,第二个参数是HFile的路径(以上MR生成HFile的输出路径),也可一个个列族录入到HBase中对应的表列族。
2. 如何入库的相关链接:
http://hbase.apache.org/docs/r0.89.20100726/bulk-loads.html
http://hbase.apache.org/docs/r0.20.6/api/org/apache/hadoop/hbase/mapreduce/package-summary.html#bulk
http://genius-bai.javaeye.com/blog/641927
3. 入库分为代码入库以及脚本入库。代码入库有两种,一种是
hadoop jar hbase-VERSION.jar completebulkload /myoutput mytable;
另外一种是通过以上的TestLoadIncrementalHFileToHBase类。
脚本入库为:jruby $HBASE_HOME/bin/loadtable.rb hbase-mytable hadoop-hbase-hfile-outputdir。