hive的查询跟普通的hadoop mapreduce没有什么大的区别,都是对原始数据的暴力扫描,如果能够像数据库那样,使用索引,那么数据扫描的速度将会大幅度提升
上次在mapreduce上使用了索引,具体参见下面这个链接
http://user.qzone.qq.com/165162897/blog/1351432946
这次在这个基础上拓展到hive里(实际上也是一个特殊的inputformat),使用示例参见如下
一、创建索引(这个没啥好说的,直接看后面的源码吧)
hadoop jar ./higo-manager-1.3.1-SNAPSHOT.jar com.alipay.higo.hadoop.sequenceIndex.SequenceIndexExample create /group/tbdev/lingning/yannian.mu/input/1.txt
/group/tbdev/lingning/yannian.mu/output 20
二、创建hive表(除了inputformat外,没任何特别之处)
CREATE EXTERNAL TABLE yannian_hive_index_test(col1 String,col2 String,col3 String,col4 String,col5 String)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\001'
STORED AS INPUTFORMAT 'com.alipay.higo.hadoop.sequenceIndex.SequenceIndexInputFormatForHive'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION '/group/tbdev/lingning/yannian.mu/output'
四、添加必要的jar
add jar ./higo-index-1.3.1-SNAPSHOT.jar; //inputformat程序
add jar ./lucene-core-3.5-SNAPSHOT.jar ;//依赖的lucene
五、查询前的基于lucene索引的过滤
//设置hive表字段,一定要与创建表时候的字段顺序和个数一致
set hive.fields.sequence=col1,col2,col3,col4,col5;
//设置本地查询用到的字段,只有用到的字段才会被扫描
set lucene.fields=col1,col3,col2;
//lucene的查询条件-这里表示仅仅扫描col1字段前缀为1的数据行
set lucene.query=col1:1*;
六、经过lucene过滤后的结果,使用HIVE继续进行分析
select col1,col3 from yannian_hive_index_test limit 1000;
不算太麻烦吧,这回贴下完整的实现代码。
package com.alipay.higo.hadoop.sequenceIndex; import java.io.DataInput; import java.io.DataOutput; import java.io.FileNotFoundException; import java.io.IOException; import java.rmi.server.UID; import java.security.MessageDigest; import java.util.Arrays; import java.util.HashMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.VersionMismatchException; import org.apache.hadoop.io.SequenceFile.Metadata; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.util.Progressable; import org.apache.lucene.store.BufferedIndexInput; import org.apache.lucene.store.Directory; import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.Lock; /** * 基于lucene索引的顺序索引 * * @author yannian.mu * */ public class SequenceIndex { private static final Log LOG = LogFactory.getLog(SequenceIndex.class); private static final byte VERSION_WITH_METADATA = (byte) 6; private static final int SYNC_ESCAPE = -1; // "length" of sync entries private static final int SYNC_HASH_SIZE = 16; // number of bytes in hash private static final int SYNC_SIZE = 4 + SYNC_HASH_SIZE; // escape + hash public static final int SYNC_INTERVAL = 100 * SYNC_SIZE; private static byte[] VERSION = new byte[] { (byte) 'S', (byte) 'E', (byte) 'I', VERSION_WITH_METADATA }; public static Writer create(FileSystem fs, Configuration conf, Path name, int bufferSize, short replication, long blockSize, Progressable progress, Metadata metadata) throws IOException { return new Writer(fs, conf, name, bufferSize, replication, blockSize, progress, metadata); } public static Reader open(FileSystem fs, Path file, int bufferSize, long start, long length, Configuration conf, boolean tempReader) throws IOException { return new Reader(fs, file, bufferSize, start, length, conf, tempReader); } public static class Writer implements java.io.Closeable { Configuration conf; FSDataOutputStream out; boolean ownOutputStream = true; DataOutputBuffer buffer = new DataOutputBuffer(); Metadata metadata = null; long lastSyncPos; // position of last sync byte[] sync; // 16 random bytes { try { MessageDigest digester = MessageDigest.getInstance("MD5"); long time = System.currentTimeMillis(); digester.update((new UID() + "@" + time).getBytes()); sync = digester.digest(); } catch (Exception e) { throw new RuntimeException(e); } } public Writer(FileSystem fs, Configuration conf, Path name, int bufferSize, short replication, long blockSize, Progressable progress, Metadata metadata) throws IOException { this.conf = conf; this.out = fs.create(name, true, bufferSize, replication, blockSize, progress); this.metadata = metadata; out.write(VERSION); this.metadata.write(out); out.write(sync); // write the sync bytes out.flush(); } public void sync() throws IOException { if (sync != null && lastSyncPos != out.getPos()) { out.writeInt(SYNC_ESCAPE); // mark the start of the sync out.write(sync); // write sync lastSyncPos = out.getPos(); // update lastSyncPos } } public Configuration getConf() { return conf; } public synchronized void close() throws IOException { if (out != null) { if (ownOutputStream) { out.close(); } else { out.flush(); } out = null; } } synchronized void checkAndWriteSync() throws IOException { if (sync != null && out.getPos() >= lastSyncPos + SYNC_INTERVAL) { // time sync(); } } public synchronized void append(Text key, Directory dir) throws IOException { checkAndWriteSync(); String[] names=dir.listAll(); out.writeInt(key.getLength()); out.write(key.getBytes(), 0, key.getLength()); out.writeInt(names.length); for (String name : dir.listAll()) { Text nameText=new Text(name); out.writeInt(nameText.getLength()); out.write(nameText.getBytes(), 0,nameText.getLength()); long filelen=dir.fileLength(name); out.writeLong(filelen); this.writeTo(filelen, dir.openInput(name.toString()), out); } } private void writeTo(long end,IndexInput input,FSDataOutputStream out) throws IOException { long pos = 0; int bufflen=1024; while (pos < end) { int length = bufflen; long nextPos = pos + length; if (nextPos > end) { // at the last buffer length = (int)(end - pos); } byte[] buff=new byte[length]; input.readBytes(buff, 0, length); out.write(buff,0,length); pos = nextPos; } } public synchronized long getLength() throws IOException { return out.getPos(); } } public static class Reader implements java.io.Closeable { private Path file; private FSDataInputStream in; private FSDataInputStream shardIn; private byte version; private Metadata metadata = null; private byte[] sync = new byte[SYNC_HASH_SIZE]; private byte[] syncCheck = new byte[SYNC_HASH_SIZE]; private boolean syncSeen; private long end; private Configuration conf; private Reader(FileSystem fs, Path file, int bufferSize, long start, long length, Configuration conf, boolean tempReader) throws IOException { this.file = file; this.in = fs.open(file, bufferSize); this.shardIn=fs.open(file, bufferSize); this.conf = conf; seek(start); this.end = in.getPos() + length; init(tempReader); } private void init(boolean tempReader) throws IOException { byte[] versionBlock = new byte[VERSION.length]; in.readFully(versionBlock); if ((versionBlock[0] != VERSION[0]) || (versionBlock[1] != VERSION[1]) || (versionBlock[2] != VERSION[2])) throw new IOException(file + " not a SequenceIndex"); version = versionBlock[3]; if (version > VERSION[3]) throw new VersionMismatchException(VERSION[3], version); this.metadata = new Metadata(); if (version >= VERSION_WITH_METADATA) { // if version >= 6 this.metadata.readFields(in); } if (version > 1) { // if version > 1 in.readFully(sync); // read sync bytes } } public synchronized void close() throws IOException { in.close(); this.shardIn.close(); } private synchronized int readKeyLength() throws IOException { if (in.getPos() >= end) { return -1; } int length = in.readInt(); if (version > 1 && sync != null && length == SYNC_ESCAPE) { // process in.readFully(syncCheck); // read syncCheck if (!Arrays.equals(sync, syncCheck)) // check it throw new IOException("File is corrupt!"); syncSeen = true; if (in.getPos() >= end) { return -1; } length = in.readInt(); // re-read length } else { syncSeen = false; } return length; } public synchronized int next(Text key, SequenceIndexDirectory dir) throws IOException { int length = readKeyLength(); if (length == -1) { return -1; } dir.setShareStream(this.shardIn); byte[] keydata = new byte[length]; in.read(keydata, 0, length); key.set(keydata); int filecount = in.readInt(); for (int i = 0; i < filecount; i++) { int namelen = in.readInt(); byte[] namebyte = new byte[namelen]; in.read(namebyte, 0, namelen); Text name = new Text(namebyte); long filelen = in.readLong(); long pos = in.getPos(); in.skip(filelen); dir.addFile(name.toString(), pos, filelen); } return length; } public Metadata getMetadata() { return this.metadata; } Configuration getConf() { return conf; } public synchronized void seek(long position) throws IOException { in.seek(position); } public synchronized void sync(long position) throws IOException { if (position + SYNC_SIZE >= end) { seek(end); return; } try { seek(position + 4); // skip escape in.readFully(syncCheck); int syncLen = sync.length; for (int i = 0; in.getPos() < end; i++) { int j = 0; for (; j < syncLen; j++) { if (sync[j] != syncCheck[(i + j) % syncLen]) break; } if (j == syncLen) { in.seek(in.getPos() - SYNC_SIZE); // position before // sync return; } syncCheck[i % syncLen] = in.readByte(); } } catch (ChecksumException e) { // checksum failure handleChecksumException(e); } } private void handleChecksumException(ChecksumException e) throws IOException { if (this.conf.getBoolean("io.skip.checksum.errors", false)) { LOG.warn("Bad checksum at " + getPosition() + ". Skipping entries."); sync(getPosition()+ this.conf.getInt("io.bytes.per.checksum", 512)); } else { throw e; } } public boolean syncSeen() { return syncSeen; } public synchronized long getPosition() throws IOException { return in.getPos(); } public String toString() { return file.toString(); } } public static class HadoopDirectory implements WritableComparable{ Directory dir=null; public Directory getDir() { return dir; } public void setDir(Directory dir) { this.dir = dir; } @Override public void write(DataOutput out) throws IOException { throw new UnsupportedOperationException(); } @Override public void readFields(DataInput in) throws IOException { throw new UnsupportedOperationException(); } @Override public int compareTo(Object arg0) { throw new UnsupportedOperationException(); } } public static class SequenceIndexDirectory extends Directory { private static int BUFFER_SIZE = 1024; private static final class FileEntry { long offset; long length; public FileEntry(long offset, long length) { this.offset = offset; this.length = length; } } private FSDataInputStream shareStream; private HashMap<String, FileEntry> entries = new HashMap<String, FileEntry>(); @Override public synchronized void close() throws IOException { if (shareStream == null) throw new IOException("Already closed"); entries.clear(); shareStream = null; } public void setShareStream(FSDataInputStream _stream) { this.shareStream = _stream; } public void addFile(String name, long offset, long length) { entries.put(name, new FileEntry(offset, length)); } @Override public synchronized IndexInput openInput(String id) throws IOException { return openInput(id, BUFFER_SIZE); } @Override public synchronized IndexInput openInput(String id, int readBufferSize) throws IOException { if (shareStream == null) throw new IOException("Stream closed"); FileEntry entry = entries.get(id); if (entry == null) { throw new IOException("No sub-file with id " + id + " found (files: " + entries.keySet() + ")"); } return new ShareIndexInput(id,shareStream, entry.offset, entry.length); } @Override public String[] listAll() { return entries.keySet().toArray(new String[entries.size()]); } @Override public boolean fileExists(String name) { return entries.containsKey(name); } @Override public long fileModified(String name) throws IOException { throw new UnsupportedOperationException(); } @Override @Deprecated public void touchFile(String name) throws IOException { throw new UnsupportedOperationException(); } @Override public void deleteFile(String name) { throw new UnsupportedOperationException(); } public void renameFile(String from, String to) { throw new UnsupportedOperationException(); } @Override public long fileLength(String name) throws IOException { FileEntry e = entries.get(name); if (e == null) throw new FileNotFoundException(name); return e.length; } @Override public IndexOutput createOutput(String name) { throw new UnsupportedOperationException(); } @Override public Lock makeLock(String name) { throw new UnsupportedOperationException(); } public static class ShareIndexInput extends BufferedIndexInput { public class Descriptor{ FSDataInputStream in=null; public FSDataInputStream getIn() { return in; } public void setIn(FSDataInputStream in) { this.in = in; } public void close() { } } private final Descriptor descriptor; private final long length; @Override public String toString() { return "ShareIndexInput [length=" + length + ", fileOffset=" + fileOffset + ", filename=" + filename + "]"; } private boolean isOpen; private boolean isClone; private long fileOffset; private String filename; public ShareIndexInput(String _filename,FSDataInputStream shareStream, long _fileOffset, long _length) throws IOException { super("sequenceIndex input"); this.filename=_filename; this.descriptor = new Descriptor(); this.descriptor.setIn(shareStream); this.fileOffset = _fileOffset; this.length = _length; this.isOpen = true; this.isClone = false; } protected void readInternal(byte[] b, int offset, int len) throws IOException { synchronized (descriptor.in) { long position = getFilePointer(); if ((position+this.fileOffset) != descriptor.in.getPos()) { descriptor.in.seek(position+this.fileOffset); } int total = 0; do { int i = descriptor.in.read(b, offset + total, len - total); if (i == -1) { throw new IOException("Read past EOF"); } total += i; } while (total < len); } } public void close() throws IOException { if (!isClone) { if (isOpen) { descriptor.close(); isOpen = false; } else { throw new IOException("Index file already closed"); } } } public long length() { return length; } protected void finalize() throws IOException { if (!isClone && isOpen) { close(); } } public Object clone() { ShareIndexInput clone = (ShareIndexInput) super.clone(); clone.isClone = true; return clone; } @Override protected void seekInternal(long pos) throws IOException { } } } } ################################################################################### package com.alipay.higo.hadoop.sequenceIndex; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.RunningJob; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.lucene.analysis.standard.StandardAnalyzer; import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; import org.apache.lucene.document.Fieldable; import org.apache.lucene.document.Field.Index; import org.apache.lucene.document.Field.Store; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy; import org.apache.lucene.index.IndexWriter.MaxFieldLength; import org.apache.lucene.queryParser.ParseException; import org.apache.lucene.queryParser.QueryParser; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.TopDocs; import org.apache.lucene.store.Directory; import org.apache.lucene.store.RAMDirectory; import org.apache.lucene.util.Version; import com.alipay.higo.hadoop.sequenceIndex.SequenceIndex.HadoopDirectory; public class SequenceIndexExample { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { String type=args[0]; String input=args[1]; String output=args[2]; Integer numreduce=Integer.parseInt(args[3]); if(type.equals("create")) { create(input, output,numreduce); }if(type.equals("searchold")) { searchOld(input, output,numreduce); }else{ search(input, output,numreduce); } } private static void search(String input,String output,int numreduce) throws IOException, InterruptedException, ClassNotFoundException { Job job = new Job(new Configuration()); job.setInputFormatClass(SequenceIndexInputFormat.class); SequenceIndexInputFormat.addInputPath(job, new Path(input)); job.setMapperClass(IndexMap.class); job.setJarByClass(SequenceIndexExample.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); TextOutputFormat.setOutputPath(job, new Path(output)); job.setNumReduceTasks(numreduce); job.waitForCompletion(true); } private static void searchOld(String input,String output,int numreduce) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf=new Configuration(); conf.set("hive.fields.sequence","index,col1,col2,col3,col4,col5"); conf.set("lucene.fields","index,col3"); conf.set("lucene.query","index:500"); JobConf jobconf=new JobConf(conf, SequenceIndexInputFormatForHive.class); jobconf.setJobName("oldsearch"); jobconf.setNumReduceTasks(numreduce); jobconf.setInputFormat(SequenceIndexInputFormatForHive.class); jobconf.setMapperClass(OldMapper.class); jobconf.setOutputKeyClass(Text.class); jobconf.setOutputValueClass(Text.class); SequenceIndexInputFormatForHive.addInputPath(jobconf, new Path(input)); org.apache.hadoop.mapred.FileOutputFormat.setOutputPath(jobconf,new Path(output)); RunningJob rj = JobClient.runJob(jobconf); } public static class OldMapper implements org.apache.hadoop.mapred.Mapper<LongWritable, BytesWritable, Text, Text> { @Override public void configure(JobConf job) { } @Override public void close() throws IOException { } @Override public void map(LongWritable key, BytesWritable value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { output.collect(new Text(String.valueOf(key.get())), new Text(value.get())); } } private static void create(String input,String output,int numreduce) throws IOException, InterruptedException, ClassNotFoundException { Job job = new Job(new Configuration()); FileInputFormat.addInputPath(job, new Path(input)); job.setJarByClass(SequenceIndexExample.class); job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(Text.class); job.setReducerClass(IndexReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(HadoopDirectory.class); job.setOutputFormatClass(SequenceIndexOutputFormat.class); SequenceIndexOutputFormat.setOutputPath(job, new Path(output)); job.setNumReduceTasks(numreduce); job.waitForCompletion(true); } public static class IndexMap extends Mapper<Text, HadoopDirectory, Text, Text> { protected void map(Text key, HadoopDirectory value, Context context) throws IOException, InterruptedException { Directory dir = value.getDir(); IndexReader reader = IndexReader.open(dir); StandardAnalyzer an = new StandardAnalyzer(Version.LUCENE_35); QueryParser q = new QueryParser(Version.LUCENE_35, "index", an); IndexSearcher searcher = new IndexSearcher(reader); TopDocs docs; try { docs = searcher.search(q.parse("index:500"), 20); } catch (ParseException e) { throw new RuntimeException(e); } ScoreDoc[] list = docs.scoreDocs; if (list != null && list.length > 0) { StringBuffer buff = new StringBuffer(); for (ScoreDoc doc : list) { Document document = searcher.doc(doc.doc); for (Fieldable f : document.getFields()) { buff.append(f.name() + "=" + document.getFieldable(f.name()).stringValue() + ","); } context.write(key, new Text(buff.toString())); } } } } public static class IndexReducer extends Reducer<LongWritable, Text, Text, HadoopDirectory> { boolean setup=false; protected void reduce(LongWritable key, Iterable<Text> values, Context context) throws java.io.IOException, InterruptedException { if(setup) { return; } setup=true; for(int k=0;k<10000;k++) { HadoopDirectory hdir=new HadoopDirectory(); hdir.setDir(new RAMDirectory()); IndexWriter writer = new IndexWriter(hdir.getDir(), null, new KeepOnlyLastCommitDeletionPolicy(), MaxFieldLength.UNLIMITED); writer.setUseCompoundFile(false); writer.setMergeFactor(2); System.out.println(k); for(int i=0;i<1000;i++) { Document doc=new Document(); doc.add(new Field("index", String.valueOf(i), Store.YES, Index.NOT_ANALYZED_NO_NORMS)); for(int j=0;j<10;j++) { doc.add(new Field("col"+j, String.valueOf(i)+","+j+","+k, Store.YES, Index.NOT_ANALYZED_NO_NORMS)); } writer.addDocument(doc); } writer.optimize(); writer.close(); context.write(new Text(String.valueOf(k)), hdir); } } } } ##################################################################### package com.alipay.higo.hadoop.sequenceIndex; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import com.alipay.higo.hadoop.sequenceIndex.SequenceIndex.HadoopDirectory; import com.alipay.higo.hadoop.sequenceIndex.SequenceIndex.SequenceIndexDirectory; public class SequenceIndexInputFormat extends FileInputFormat<Text,HadoopDirectory>{ @Override public RecordReader<Text,HadoopDirectory> createRecordReader(InputSplit split, TaskAttemptContext context ) throws IOException { try { return new SequenceIndexRecordReader(split,context); } catch (InterruptedException e) { throw new IOException(e); } } @Override protected long getFormatMinSplitSize() { return SequenceIndex.SYNC_INTERVAL; } public static class SequenceIndexRecordReader extends RecordReader<Text,HadoopDirectory>{ private SequenceIndex.Reader in; private long start; private long end; private boolean more = true; private Text key = null; private HadoopDirectory value = null; protected Configuration conf; public void initialize(InputSplit split, TaskAttemptContext context ) throws IOException, InterruptedException { } public SequenceIndexRecordReader(InputSplit split, TaskAttemptContext context ) throws IOException, InterruptedException { FileSplit fileSplit = (FileSplit) split; this.init(context.getConfiguration(), fileSplit.getPath(), fileSplit.getStart(), fileSplit.getLength()); } public SequenceIndexRecordReader(Configuration _conf,Path _path,long _start,long _len) throws IOException, InterruptedException { this.init(_conf, _path, _start, _len); } private void init(Configuration _conf, Path path, long _start, long len) throws IOException, InterruptedException { conf = _conf; FileSystem fs = path.getFileSystem(conf); this.in = SequenceIndex.open(fs, path, conf.getInt( "io.file.buffer.size", 4096), 0, fs.getFileStatus(path) .getLen(), conf, false);// new SequenceFile.Reader(fs, path, this.end = _start + len; if (_start > in.getPosition()) { in.sync(_start); // sync to start } this.start = in.getPosition(); more = _start < end; } @Override public boolean nextKeyValue() throws IOException, InterruptedException { if (!more) { return false; } long pos = in.getPosition(); this.key=new Text(); this.value=new HadoopDirectory(); SequenceIndexDirectory dir=new SequenceIndexDirectory(); this.value.setDir(dir); if(this.in.next(this.key, dir)<0||(pos >= end && in.syncSeen())) { more = false; key = null; value = null; } return more; } @Override public Text getCurrentKey() { return key; } @Override public HadoopDirectory getCurrentValue() { return value; } public long getpos() throws IOException { return in.getPosition(); } public float getProgress() throws IOException { if (end == start) { return 0.0f; } else { return Math.min(1.0f, (in.getPosition() - start) / (float)(end - start)); } } public synchronized void close() throws IOException { in.close(); } } } ############################################################# package com.alipay.higo.hadoop.sequenceIndex; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.SequenceFileInputFormat; import org.apache.lucene.analysis.standard.StandardAnalyzer; import org.apache.lucene.document.Document; import org.apache.lucene.document.Fieldable; import org.apache.lucene.document.MapFieldSelector; import org.apache.lucene.index.IndexReader; import org.apache.lucene.queryParser.ParseException; import org.apache.lucene.queryParser.QueryParser; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.TopDocs; import org.apache.lucene.util.Version; import com.alipay.higo.hadoop.sequenceIndex.SequenceIndex.HadoopDirectory; import com.alipay.higo.hadoop.sequenceIndex.SequenceIndexInputFormat.SequenceIndexRecordReader; public class SequenceIndexInputFormatForHive extends SequenceFileInputFormat<LongWritable, BytesWritable> { public RecordReader<LongWritable, BytesWritable> getRecordReader( InputSplit split, JobConf job, Reporter reporter) throws IOException { FileSplit part = (FileSplit) split; return new HiveTarRecordReader(job, part); } public static class HiveTarRecordReader implements RecordReader<LongWritable, BytesWritable> { private SequenceIndexRecordReader seqReader = null; IndexReader reader=null; private String hive_fields = ""; private ArrayList<String> rowFields = new ArrayList<String>(); private ArrayList<String> lucene_fields = new ArrayList<String>(); private String lucene_query = ""; private HadoopDirectory dir; private IndexSearcher searcher; private ScoreDoc[] list; private int lineIndex = -1; FileSplit split; public HiveTarRecordReader(Configuration conf, FileSplit _split) throws IOException { this.hive_fields = conf.get("hive.fields.sequence",""); this.split=_split; for(String f:this.hive_fields.split(",")) { this.rowFields.add(f); } for(String f:conf.get("lucene.fields","").split(",")) { this.lucene_fields.add(f); } this.lucene_query = conf.get("lucene.query"); try { seqReader = new SequenceIndexRecordReader(conf,_split.getPath(),_split.getStart(),_split.getLength()); } catch (InterruptedException e) { throw new IOException(e); } } public synchronized boolean next(LongWritable pos, BytesWritable k) throws IOException { while (lineIndex == -1 || list == null || lineIndex >= list.length) { try { if (!seqReader.nextKeyValue()) { return false; } } catch (InterruptedException e1) { throw new IOException(e1); } if(this.searcher!=null) { this.searcher.close(); } if(this.reader!=null) { this.reader.close(); } if(this.dir!=null) { this.dir.getDir().close(); } this.dir = seqReader.getCurrentValue(); try{ this.reader = IndexReader.open(dir.getDir()); }catch(IOException e) { throw new IOException(this.split.toString()+"@@@"+dir.getDir().toString()+"@@@"+dir.getDir().getClass().getName(), e); } StandardAnalyzer an = new StandardAnalyzer(Version.LUCENE_35); QueryParser q = new QueryParser(Version.LUCENE_35, "index", an); this.searcher = new IndexSearcher(reader); TopDocs docs; try { docs = this.searcher.search(q.parse(this.lucene_query), 10000000); } catch (ParseException e) { throw new RuntimeException(e); } this.list = docs.scoreDocs; this.lineIndex=0; } ScoreDoc doc=this.list[this.lineIndex]; Document document = this.searcher.doc(doc.doc,new MapFieldSelector(this.lucene_fields)); HashMap<String,String> val=new HashMap<String,String>(); for (Fieldable f : document.getFields()) { String fname=f.name(); val.put(fname, document.getFieldable(fname).stringValue()); } StringBuffer buff = new StringBuffer(); String joinchar=""; for(String f:this.rowFields) { buff.append(joinchar); if(val.containsKey(f)) { buff.append(val.get(f)); }else{ buff.append("-"); } joinchar="\001"; } pos.set(this.seqReader.getpos()); String line=buff.toString(); byte[] textBytes = line.getBytes(); int length = line.length(); k.set(textBytes, 0, textBytes.length); lineIndex++; return true; } public void close() throws IOException { seqReader.close(); } public LongWritable createKey() { return new LongWritable(); } public BytesWritable createValue() { return new BytesWritable(); } public long getPos() throws IOException { return seqReader.getpos(); } public float getProgress() throws IOException { return seqReader.getProgress(); } } } ################################################################################### package com.alipay.higo.hadoop.sequenceIndex; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.SequenceFile.Metadata; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import com.alipay.higo.hadoop.sequenceIndex.SequenceIndex.HadoopDirectory; public class SequenceIndexOutputFormat extends FileOutputFormat<Text,HadoopDirectory>{ public RecordWriter<Text,HadoopDirectory> getRecordWriter(TaskAttemptContext context ) throws IOException, InterruptedException { Configuration conf = context.getConfiguration(); Path file = getDefaultWorkFile(context, ""); FileSystem fs = file.getFileSystem(conf); final SequenceIndex.Writer out = SequenceIndex.create(fs, conf, file, conf.getInt("io.file.buffer.size", 4096), fs.getDefaultReplication(), fs.getDefaultBlockSize(), null, new Metadata()); return new RecordWriter<Text,HadoopDirectory>() { public void write(Text key, HadoopDirectory value) throws IOException { out.append(key, value.getDir()); } public void close(TaskAttemptContext context) throws IOException { out.close(); } }; } }