现在的位置: 首页 > 综合 > 正文

带索引的HIVE

2014年02月18日 ⁄ 综合 ⁄ 共 27557字 ⁄ 字号 评论关闭

    hive的查询跟普通的hadoop mapreduce没有什么大的区别,都是对原始数据的暴力扫描,如果能够像数据库那样,使用索引,那么数据扫描的速度将会大幅度提升
上次在mapreduce上使用了索引,具体参见下面这个链接
http://user.qzone.qq.com/165162897/blog/1351432946
这次在这个基础上拓展到hive里(实际上也是一个特殊的inputformat),使用示例参见如下


一、创建索引(这个没啥好说的,直接看后面的源码吧)
hadoop jar ./higo-manager-1.3.1-SNAPSHOT.jar com.alipay.higo.hadoop.sequenceIndex.SequenceIndexExample create /group/tbdev/lingning/yannian.mu/input/1.txt
/group/tbdev/lingning/yannian.mu/output 20


二、创建hive表(除了inputformat外,没任何特别之处)
CREATE EXTERNAL TABLE yannian_hive_index_test(col1 String,col2 String,col3 String,col4 String,col5 String)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\001'
STORED AS INPUTFORMAT 'com.alipay.higo.hadoop.sequenceIndex.SequenceIndexInputFormatForHive'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION '/group/tbdev/lingning/yannian.mu/output'

四、添加必要的jar
add jar ./higo-index-1.3.1-SNAPSHOT.jar;   //inputformat程序
add jar ./lucene-core-3.5-SNAPSHOT.jar ;//依赖的lucene 

五、查询前的基于lucene索引的过滤
//设置hive表字段,一定要与创建表时候的字段顺序和个数一致
set hive.fields.sequence=col1,col2,col3,col4,col5;
//设置本地查询用到的字段,只有用到的字段才会被扫描
set lucene.fields=col1,col3,col2;
//lucene的查询条件-这里表示仅仅扫描col1字段前缀为1的数据行
set lucene.query=col1:1*;
 

六、经过lucene过滤后的结果,使用HIVE继续进行分析
select col1,col3 from yannian_hive_index_test limit 1000;

不算太麻烦吧,这回贴下完整的实现代码。

package com.alipay.higo.hadoop.sequenceIndex;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.rmi.server.UID;
import java.security.MessageDigest;
import java.util.Arrays;
import java.util.HashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.VersionMismatchException;
import org.apache.hadoop.io.SequenceFile.Metadata;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.util.Progressable;
import org.apache.lucene.store.BufferedIndexInput;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.Lock;
/**
 * 基于lucene索引的顺序索引 
 * 
 * @author yannian.mu
 *
 */
public class SequenceIndex {
 private static final Log LOG = LogFactory.getLog(SequenceIndex.class);
 private static final byte VERSION_WITH_METADATA = (byte) 6;
 private static final int SYNC_ESCAPE = -1; // "length" of sync entries
 private static final int SYNC_HASH_SIZE = 16; // number of bytes in hash
 private static final int SYNC_SIZE = 4 + SYNC_HASH_SIZE; // escape + hash
 public static final int SYNC_INTERVAL = 100 * SYNC_SIZE;
 private static byte[] VERSION = new byte[] { (byte) 'S', (byte) 'E',
   (byte) 'I', VERSION_WITH_METADATA };
 
 public static Writer create(FileSystem fs, Configuration conf, Path name,
   int bufferSize, short replication, long blockSize,
   Progressable progress, Metadata metadata) throws IOException
 {
  return new Writer(fs, conf, name, bufferSize, replication, blockSize, progress, metadata);
 }
 
 public static Reader open(FileSystem fs, Path file, int bufferSize, long start,
   long length, Configuration conf, boolean tempReader) throws IOException
 {
  return new Reader(fs, file, bufferSize, start, length, conf, tempReader);
 }
 public static class Writer implements java.io.Closeable {
  Configuration conf;
  FSDataOutputStream out;
  boolean ownOutputStream = true;
  DataOutputBuffer buffer = new DataOutputBuffer();
  Metadata metadata = null;
  long lastSyncPos; // position of last sync
  byte[] sync; // 16 random bytes
  {
   try {
    MessageDigest digester = MessageDigest.getInstance("MD5");
    long time = System.currentTimeMillis();
    digester.update((new UID() + "@" + time).getBytes());
    sync = digester.digest();
   } catch (Exception e) {
    throw new RuntimeException(e);
   }
  }
  public Writer(FileSystem fs, Configuration conf, Path name,
    int bufferSize, short replication, long blockSize,
    Progressable progress, Metadata metadata) throws IOException {
   this.conf = conf;
   this.out = fs.create(name, true, bufferSize, replication,
     blockSize, progress);
   this.metadata = metadata;
   out.write(VERSION);
   this.metadata.write(out);
   out.write(sync); // write the sync bytes
   out.flush();
  }
  public void sync() throws IOException {
   if (sync != null && lastSyncPos != out.getPos()) {
    out.writeInt(SYNC_ESCAPE); // mark the start of the sync
    out.write(sync); // write sync
    lastSyncPos = out.getPos(); // update lastSyncPos
   }
  }
  public Configuration getConf() {
   return conf;
  }
  public synchronized void close() throws IOException {
   if (out != null) {
    if (ownOutputStream) {
     out.close();
    } else {
     out.flush();
    }
    out = null;
   }
  }
  synchronized void checkAndWriteSync() throws IOException {
   if (sync != null && out.getPos() >= lastSyncPos + SYNC_INTERVAL) { // time
    sync();
   }
  }
  public synchronized void append(Text key, Directory dir)
    throws IOException {
   checkAndWriteSync();
   String[] names=dir.listAll();
   out.writeInt(key.getLength());
   out.write(key.getBytes(), 0, key.getLength());
   out.writeInt(names.length);
   for (String name : dir.listAll()) {
    Text nameText=new Text(name);
    out.writeInt(nameText.getLength());
    out.write(nameText.getBytes(), 0,nameText.getLength());
    long filelen=dir.fileLength(name);
    out.writeLong(filelen);
    this.writeTo(filelen, dir.openInput(name.toString()), out);
   }
 
  }
  
    private void writeTo(long end,IndexInput input,FSDataOutputStream out) throws IOException {
       long pos = 0;
       int bufflen=1024;
       while (pos < end) {
         int length = bufflen;
         long nextPos = pos + length;
         if (nextPos > end) {                        // at the last buffer
           length = (int)(end - pos);
         }
        byte[] buff=new byte[length];
        input.readBytes(buff, 0, length);
         
         out.write(buff,0,length);
         pos = nextPos;
       }
     }
  public synchronized long getLength() throws IOException {
   return out.getPos();
  }
 }
 public static class Reader implements java.io.Closeable {
  private Path file;
  private FSDataInputStream in;
  private FSDataInputStream shardIn;
  private byte version;
  private Metadata metadata = null;
  private byte[] sync = new byte[SYNC_HASH_SIZE];
  private byte[] syncCheck = new byte[SYNC_HASH_SIZE];
  private boolean syncSeen;
  private long end;
  private Configuration conf;
  
  private Reader(FileSystem fs, Path file, int bufferSize, long start,
    long length, Configuration conf, boolean tempReader)
    throws IOException {
   this.file = file;
   this.in = fs.open(file, bufferSize);
   this.shardIn=fs.open(file, bufferSize);
   this.conf = conf;
   seek(start);
   this.end = in.getPos() + length;
   init(tempReader);
  }
  private void init(boolean tempReader) throws IOException {
   byte[] versionBlock = new byte[VERSION.length];
   in.readFully(versionBlock);
   if ((versionBlock[0] != VERSION[0])
     || (versionBlock[1] != VERSION[1])
     || (versionBlock[2] != VERSION[2]))
    throw new IOException(file + " not a SequenceIndex");
   version = versionBlock[3];
   if (version > VERSION[3])
    throw new VersionMismatchException(VERSION[3], version);
   this.metadata = new Metadata();
   if (version >= VERSION_WITH_METADATA) { // if version >= 6
    this.metadata.readFields(in);
   }
   if (version > 1) { // if version > 1
    in.readFully(sync); // read sync bytes
   }
  }
  public synchronized void close() throws IOException {
   in.close();
   this.shardIn.close();
  }
  private synchronized int readKeyLength() throws IOException {
   if (in.getPos() >= end) {
    return -1;
   }
   int length = in.readInt();
   if (version > 1 && sync != null && length == SYNC_ESCAPE) { // process
    in.readFully(syncCheck); // read syncCheck
    if (!Arrays.equals(sync, syncCheck)) // check it
     throw new IOException("File is corrupt!");
    syncSeen = true;
    if (in.getPos() >= end) {
     return -1;
    }
    length = in.readInt(); // re-read length
   } else {
    syncSeen = false;
   }
   return length;
  }
  public synchronized int next(Text key, SequenceIndexDirectory dir)
    throws IOException {
   int length = readKeyLength();
   if (length == -1) {
    return -1;
   }
   
   dir.setShareStream(this.shardIn);
   
   byte[] keydata = new byte[length];
   in.read(keydata, 0, length);
   key.set(keydata);
   int filecount = in.readInt();
   for (int i = 0; i < filecount; i++) {
    int namelen = in.readInt();
    byte[] namebyte = new byte[namelen];
    in.read(namebyte, 0, namelen);
    Text name = new Text(namebyte);
    long filelen = in.readLong();
    long pos = in.getPos();
    in.skip(filelen);
    dir.addFile(name.toString(), pos, filelen);
   }
   return length;
  }
  public Metadata getMetadata() {
   return this.metadata;
  }
  Configuration getConf() {
   return conf;
  }
  public synchronized void seek(long position) throws IOException {
   in.seek(position);
  }
  public synchronized void sync(long position) throws IOException {
   if (position + SYNC_SIZE >= end) {
    seek(end);
    return;
   }
   try {
    seek(position + 4); // skip escape
    in.readFully(syncCheck);
    int syncLen = sync.length;
    for (int i = 0; in.getPos() < end; i++) {
     int j = 0;
     for (; j < syncLen; j++) {
      if (sync[j] != syncCheck[(i + j) % syncLen])
       break;
     }
     if (j == syncLen) {
      in.seek(in.getPos() - SYNC_SIZE); // position before
               // sync
      return;
     }
     syncCheck[i % syncLen] = in.readByte();
    }
   } catch (ChecksumException e) { // checksum failure
    handleChecksumException(e);
   }
  }
  private void handleChecksumException(ChecksumException e)
    throws IOException {
   if (this.conf.getBoolean("io.skip.checksum.errors", false)) {
    LOG.warn("Bad checksum at " + getPosition() + ". Skipping entries.");
    sync(getPosition()+ this.conf.getInt("io.bytes.per.checksum", 512));
   } else {
    throw e;
   }
  }
  public boolean syncSeen() {
   return syncSeen;
  }
  public synchronized long getPosition() throws IOException {
   return in.getPos();
  }
  public String toString() {
   return file.toString();
  }
 }
 
 
 public static class HadoopDirectory implements WritableComparable{
  Directory dir=null;
  public Directory getDir() {
   return dir;
  }
  public void setDir(Directory dir) {
   this.dir = dir;
  }
  @Override
  public void write(DataOutput out) throws IOException {
   throw new UnsupportedOperationException();
  }
  @Override
  public void readFields(DataInput in) throws IOException {
   throw new UnsupportedOperationException();
  }
  @Override
  public int compareTo(Object arg0) {
   throw new UnsupportedOperationException();
  }
 }
 public static class SequenceIndexDirectory extends Directory {
  private static int BUFFER_SIZE = 1024;
  private static final class FileEntry {
   long offset;
   long length;
   public FileEntry(long offset, long length) {
    this.offset = offset;
    this.length = length;
   }
  }
  private FSDataInputStream shareStream;
  private HashMap<String, FileEntry> entries = new HashMap<String, FileEntry>();
  @Override
  public synchronized void close() throws IOException {
   if (shareStream == null)
    throw new IOException("Already closed");
   entries.clear();
   shareStream = null;
  }
  public void setShareStream(FSDataInputStream _stream) {
   this.shareStream = _stream;
  }
  public void addFile(String name, long offset, long length) {
   entries.put(name, new FileEntry(offset, length));
  }
  @Override
  public synchronized IndexInput openInput(String id) throws IOException {
   return openInput(id, BUFFER_SIZE);
  }
  @Override
  public synchronized IndexInput openInput(String id, int readBufferSize)
    throws IOException {
   if (shareStream == null)
    throw new IOException("Stream closed");
   FileEntry entry = entries.get(id);
   if (entry == null) {
    throw new IOException("No sub-file with id " + id
      + " found (files: " + entries.keySet() + ")");
   }
   return new ShareIndexInput(id,shareStream, entry.offset,
     entry.length);
  }
  @Override
  public String[] listAll() {
   return entries.keySet().toArray(new String[entries.size()]);
  }
  @Override
  public boolean fileExists(String name) {
   return entries.containsKey(name);
  }
  @Override
  public long fileModified(String name) throws IOException {
   throw new UnsupportedOperationException();
  }
  @Override
  @Deprecated
  public void touchFile(String name) throws IOException {
   throw new UnsupportedOperationException();
  }
  @Override
  public void deleteFile(String name) {
   throw new UnsupportedOperationException();
  }
  public void renameFile(String from, String to) {
   throw new UnsupportedOperationException();
  }
  @Override
  public long fileLength(String name) throws IOException {
   FileEntry e = entries.get(name);
   if (e == null)
    throw new FileNotFoundException(name);
   return e.length;
  }
  @Override
  public IndexOutput createOutput(String name) {
   throw new UnsupportedOperationException();
  }
  @Override
  public Lock makeLock(String name) {
   throw new UnsupportedOperationException();
  }
  public static class ShareIndexInput extends BufferedIndexInput {
   public class Descriptor{
    
    FSDataInputStream in=null;
    public FSDataInputStream getIn() {
     return in;
    }
    public void setIn(FSDataInputStream in) {
     this.in = in;
    }
    
    public void close()
    {
     
    }
   }
   private final Descriptor descriptor;
   private final long length;
   @Override
   public String toString() {
    return "ShareIndexInput [length=" + length + ", fileOffset="
      + fileOffset + ", filename=" + filename + "]";
   }
   private boolean isOpen;
   private boolean isClone;
   private long fileOffset;
   private String filename;
   public ShareIndexInput(String _filename,FSDataInputStream shareStream,
     long _fileOffset, long _length) throws IOException {
    super("sequenceIndex input");
    this.filename=_filename;
    this.descriptor = new Descriptor();
    this.descriptor.setIn(shareStream);
    
    this.fileOffset = _fileOffset;
    this.length = _length;
    this.isOpen = true;
    this.isClone = false;
   }

   protected void readInternal(byte[] b, int offset, int len)
     throws IOException {

    synchronized (descriptor.in) {
     long position = getFilePointer();
     if ((position+this.fileOffset) != descriptor.in.getPos()) {
      descriptor.in.seek(position+this.fileOffset);
     }
     int total = 0;
     do {
      int i = descriptor.in.read(b, offset + total, len
        - total);
      if (i == -1) {
       throw new IOException("Read past EOF");
      }
      total += i;
     } while (total < len);
    }
   }
   public void close() throws IOException {
    if (!isClone) {
     if (isOpen) {
      descriptor.close();
      isOpen = false;
     } else {
      throw new IOException("Index file already closed");
     }
    }
   }
   public long length() {
    return length;
   }
   protected void finalize() throws IOException {
    if (!isClone && isOpen) {
     close();
    }
   }
   public Object clone() {
    ShareIndexInput clone = (ShareIndexInput) super.clone();
    clone.isClone = true;
    return clone;
   }
   @Override
   protected void seekInternal(long pos) throws IOException {
    
   }
   
  }
  
 }
}

###################################################################################
package com.alipay.higo.hadoop.sequenceIndex;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.Fieldable;
import org.apache.lucene.document.Field.Index;
import org.apache.lucene.document.Field.Store;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
import org.apache.lucene.index.IndexWriter.MaxFieldLength;
import org.apache.lucene.queryParser.ParseException;
import org.apache.lucene.queryParser.QueryParser;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.RAMDirectory;
import org.apache.lucene.util.Version;
import com.alipay.higo.hadoop.sequenceIndex.SequenceIndex.HadoopDirectory;
public class SequenceIndexExample {
 public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
  String type=args[0];
  String input=args[1];
  String output=args[2];
  Integer numreduce=Integer.parseInt(args[3]);
  if(type.equals("create"))
  {
   create(input, output,numreduce);
  }if(type.equals("searchold"))
  {
   searchOld(input, output,numreduce);
  }else{
   search(input, output,numreduce);
  }
 }
 private static void search(String input,String output,int numreduce) throws IOException, InterruptedException, ClassNotFoundException
 {
  Job job = new Job(new Configuration());
  job.setInputFormatClass(SequenceIndexInputFormat.class);
  SequenceIndexInputFormat.addInputPath(job, new Path(input));
  job.setMapperClass(IndexMap.class);
  job.setJarByClass(SequenceIndexExample.class);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(Text.class);
  TextOutputFormat.setOutputPath(job, new Path(output));
  job.setNumReduceTasks(numreduce);
  job.waitForCompletion(true);
 }
 
 private static void searchOld(String input,String output,int numreduce) throws IOException, InterruptedException, ClassNotFoundException
 {
  Configuration conf=new Configuration();
  conf.set("hive.fields.sequence","index,col1,col2,col3,col4,col5");
  conf.set("lucene.fields","index,col3");
  conf.set("lucene.query","index:500");
  JobConf jobconf=new JobConf(conf, SequenceIndexInputFormatForHive.class);
  jobconf.setJobName("oldsearch");
  jobconf.setNumReduceTasks(numreduce);
  jobconf.setInputFormat(SequenceIndexInputFormatForHive.class);
  jobconf.setMapperClass(OldMapper.class);
  
  jobconf.setOutputKeyClass(Text.class);
  jobconf.setOutputValueClass(Text.class);
  SequenceIndexInputFormatForHive.addInputPath(jobconf, new Path(input));
  org.apache.hadoop.mapred.FileOutputFormat.setOutputPath(jobconf,new Path(output));
        RunningJob rj = JobClient.runJob(jobconf);
 }
 
  public static class OldMapper implements org.apache.hadoop.mapred.Mapper<LongWritable, BytesWritable, Text, Text> {
  @Override
  public void configure(JobConf job) {
   
  }
  @Override
  public void close() throws IOException {
   
  }
  @Override
  public void map(LongWritable key, BytesWritable value,
    OutputCollector<Text, Text> output, Reporter reporter)
    throws IOException {
   output.collect(new Text(String.valueOf(key.get())), new Text(value.get()));
   
  }
      
     }
 
 private static void create(String input,String output,int numreduce) throws IOException, InterruptedException, ClassNotFoundException
 {
  Job job = new Job(new Configuration());
  FileInputFormat.addInputPath(job, new Path(input));
  job.setJarByClass(SequenceIndexExample.class);
  job.setMapOutputKeyClass(LongWritable.class);
  job.setMapOutputValueClass(Text.class);
  
  job.setReducerClass(IndexReducer.class);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(HadoopDirectory.class);
  job.setOutputFormatClass(SequenceIndexOutputFormat.class);
  SequenceIndexOutputFormat.setOutputPath(job, new Path(output));
  job.setNumReduceTasks(numreduce);
  job.waitForCompletion(true);
 }
 
 public static class IndexMap extends
   Mapper<Text, HadoopDirectory, Text, Text> {
  protected void map(Text key, HadoopDirectory value, Context context)
    throws IOException, InterruptedException {
   Directory dir = value.getDir();
   IndexReader reader = IndexReader.open(dir);
   StandardAnalyzer an = new StandardAnalyzer(Version.LUCENE_35);
   QueryParser q = new QueryParser(Version.LUCENE_35, "index", an);
   IndexSearcher searcher = new IndexSearcher(reader);
   TopDocs docs;
   try {
    docs = searcher.search(q.parse("index:500"), 20);
   } catch (ParseException e) {
    throw new RuntimeException(e);
   }
   ScoreDoc[] list = docs.scoreDocs;
   if (list != null && list.length > 0) {
    StringBuffer buff = new StringBuffer();
    for (ScoreDoc doc : list) {
     Document document = searcher.doc(doc.doc);
     for (Fieldable f : document.getFields()) {
      buff.append(f.name() + "="
        + document.getFieldable(f.name()).stringValue()
        + ",");
     }
     context.write(key, new Text(buff.toString()));
    }
   }
  }
 }
 public static class IndexReducer extends
    Reducer<LongWritable, Text, Text, HadoopDirectory> {
  boolean setup=false;
    protected void reduce(LongWritable key, Iterable<Text> values,
        Context context) throws java.io.IOException, InterruptedException {
   if(setup)
   {
    return;
   }
   setup=true;
     for(int k=0;k<10000;k++)
     {
      HadoopDirectory hdir=new HadoopDirectory();
      hdir.setDir(new RAMDirectory());
      
      IndexWriter writer = new IndexWriter(hdir.getDir(), null,
             new KeepOnlyLastCommitDeletionPolicy(),
             MaxFieldLength.UNLIMITED);
     writer.setUseCompoundFile(false);
     writer.setMergeFactor(2);
    System.out.println(k);
    
    for(int i=0;i<1000;i++)
    {
     Document doc=new Document();
     doc.add(new Field("index", String.valueOf(i), Store.YES, Index.NOT_ANALYZED_NO_NORMS));
     for(int j=0;j<10;j++)
     {
      doc.add(new Field("col"+j, String.valueOf(i)+","+j+","+k, Store.YES, Index.NOT_ANALYZED_NO_NORMS));
     }
     writer.addDocument(doc);
    }
    
    writer.optimize();
    writer.close();
    context.write(new Text(String.valueOf(k)), hdir);
     }
     
    }
  
 }
}
#####################################################################
package com.alipay.higo.hadoop.sequenceIndex;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import com.alipay.higo.hadoop.sequenceIndex.SequenceIndex.HadoopDirectory;
import com.alipay.higo.hadoop.sequenceIndex.SequenceIndex.SequenceIndexDirectory;
public class SequenceIndexInputFormat extends FileInputFormat<Text,HadoopDirectory>{
 @Override
   public RecordReader<Text,HadoopDirectory> createRecordReader(InputSplit split,
                                                TaskAttemptContext context
                                                ) throws IOException {
  try {
   return  new SequenceIndexRecordReader(split,context);
  } catch (InterruptedException e) {
   throw new IOException(e);
  }
   }
   @Override
   protected long getFormatMinSplitSize() {
     return SequenceIndex.SYNC_INTERVAL;
   }
   
   public static class SequenceIndexRecordReader extends RecordReader<Text,HadoopDirectory>{
    private SequenceIndex.Reader in;
    private long start;
    private long end;
    private boolean more = true;
    private Text key = null;
    private HadoopDirectory value = null;
    protected Configuration conf;
    public void initialize(InputSplit split, 
                  TaskAttemptContext context
                  ) throws IOException, InterruptedException {
    }
    public SequenceIndexRecordReader(InputSplit split, 
                           TaskAttemptContext context
                           ) throws IOException, InterruptedException {
     FileSplit fileSplit = (FileSplit) split;
      this.init(context.getConfiguration(), fileSplit.getPath(), fileSplit.getStart(), fileSplit.getLength());
    }
    
    
  public SequenceIndexRecordReader(Configuration _conf,Path _path,long _start,long _len) throws IOException,
    InterruptedException {
   this.init(_conf, _path, _start, _len);
  }
  
  private void init(Configuration _conf, Path path,
    long _start, long len) throws IOException, InterruptedException {
   conf = _conf;
   FileSystem fs = path.getFileSystem(conf);
   this.in = SequenceIndex.open(fs, path, conf.getInt(
     "io.file.buffer.size", 4096), 0, fs.getFileStatus(path)
     .getLen(), conf, false);// new SequenceFile.Reader(fs, path,
   this.end = _start + len;
   if (_start > in.getPosition()) {
    in.sync(_start); // sync to start
   }
   this.start = in.getPosition();
   more = _start < end;
  }
    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
      if (!more) {
        return false;
      }
      long pos = in.getPosition();
      
      this.key=new Text();
      this.value=new HadoopDirectory();
      SequenceIndexDirectory dir=new SequenceIndexDirectory();
      this.value.setDir(dir);
      if(this.in.next(this.key, dir)<0||(pos >= end && in.syncSeen()))
      {
       more = false;
       key = null;
       value = null;
      }
      return more;
    }
    @Override
    public Text getCurrentKey() {
      return key;
    }
    
    @Override
    public HadoopDirectory getCurrentValue() {
      return value;
    }
    
    public long getpos() throws IOException
    {
     return in.getPosition();
    }
    
    public float getProgress() throws IOException {
      if (end == start) {
        return 0.0f;
      } else {
        return Math.min(1.0f, (in.getPosition() - start) / (float)(end - start));
      }
    }
    
    public synchronized void close() throws IOException { in.close(); }
   
   }
}

#############################################################
package com.alipay.higo.hadoop.sequenceIndex;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Fieldable;
import org.apache.lucene.document.MapFieldSelector;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.queryParser.ParseException;
import org.apache.lucene.queryParser.QueryParser;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.util.Version;
import com.alipay.higo.hadoop.sequenceIndex.SequenceIndex.HadoopDirectory;
import com.alipay.higo.hadoop.sequenceIndex.SequenceIndexInputFormat.SequenceIndexRecordReader;

public class SequenceIndexInputFormatForHive extends SequenceFileInputFormat<LongWritable, BytesWritable> {
 public RecordReader<LongWritable, BytesWritable> getRecordReader(
   InputSplit split, JobConf job, Reporter reporter)
   throws IOException {
  FileSplit part = (FileSplit) split;
  return new HiveTarRecordReader(job, part);
 }

 public static class HiveTarRecordReader implements
   RecordReader<LongWritable, BytesWritable> {
  private SequenceIndexRecordReader seqReader = null;
  IndexReader reader=null;
  private String hive_fields = "";
  private ArrayList<String> rowFields = new ArrayList<String>();
  private ArrayList<String> lucene_fields = new ArrayList<String>();
  private String lucene_query = "";
  private HadoopDirectory dir;
  private IndexSearcher searcher;
  private ScoreDoc[] list;
  private int lineIndex = -1;
  FileSplit split;
  public HiveTarRecordReader(Configuration conf, FileSplit _split)
    throws IOException {
   this.hive_fields = conf.get("hive.fields.sequence","");
   this.split=_split;
   for(String f:this.hive_fields.split(","))
   {
    this.rowFields.add(f);
   }
   
   for(String f:conf.get("lucene.fields","").split(","))
   {
    this.lucene_fields.add(f);
   }
   
   this.lucene_query = conf.get("lucene.query");
   try {
    seqReader = new SequenceIndexRecordReader(conf,_split.getPath(),_split.getStart(),_split.getLength());
   } catch (InterruptedException e) {
    throw new IOException(e);
   }
  }
  public synchronized boolean next(LongWritable pos, BytesWritable k)
    throws IOException {
   while (lineIndex == -1 || list == null || lineIndex >= list.length) {
    try {
     if (!seqReader.nextKeyValue()) {
      return false;
     }
    } catch (InterruptedException e1) {
     throw new IOException(e1);
    }
    
    if(this.searcher!=null)
    {
     this.searcher.close();
    }
    if(this.reader!=null)
    {
     this.reader.close();
    }
    if(this.dir!=null)
    {
     this.dir.getDir().close();
    }
    
    this.dir = seqReader.getCurrentValue();
    try{
     this.reader = IndexReader.open(dir.getDir());
    }catch(IOException e)
    {
     throw new IOException(this.split.toString()+"@@@"+dir.getDir().toString()+"@@@"+dir.getDir().getClass().getName(), e);
    }
    StandardAnalyzer an = new StandardAnalyzer(Version.LUCENE_35);
    QueryParser q = new QueryParser(Version.LUCENE_35, "index", an);
    this.searcher = new IndexSearcher(reader);
    TopDocs docs;
    try {
     docs = this.searcher.search(q.parse(this.lucene_query), 10000000);
    } catch (ParseException e) {
     throw new RuntimeException(e);
    }
    this.list = docs.scoreDocs;
    this.lineIndex=0;
   }
   
   
   ScoreDoc doc=this.list[this.lineIndex];
   Document document = this.searcher.doc(doc.doc,new MapFieldSelector(this.lucene_fields));
   HashMap<String,String> val=new HashMap<String,String>();
   for (Fieldable f : document.getFields()) {
    String fname=f.name();
    val.put(fname, document.getFieldable(fname).stringValue());
   }
   
   StringBuffer buff = new StringBuffer();
   String joinchar="";
   for(String f:this.rowFields)
   {
    buff.append(joinchar);    
    if(val.containsKey(f))
    {
     buff.append(val.get(f));
    }else{
     buff.append("-");
    }
    
    joinchar="\001";
   }
   
      pos.set(this.seqReader.getpos());
      String line=buff.toString();
            byte[] textBytes = line.getBytes();
            int length = line.length();
       
            k.set(textBytes, 0, textBytes.length);
   
   lineIndex++;
   return true;
  }

        public void close() throws IOException {
            seqReader.close();
        }
        public LongWritable createKey() {
            return new LongWritable();
        }
        public BytesWritable createValue() {
            return new BytesWritable();
        }
        public long getPos() throws IOException {
            return seqReader.getpos();
        }
        public float getProgress() throws IOException {
            return seqReader.getProgress();
        }
    }
}
###################################################################################
package com.alipay.higo.hadoop.sequenceIndex;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.SequenceFile.Metadata;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import com.alipay.higo.hadoop.sequenceIndex.SequenceIndex.HadoopDirectory;

public class SequenceIndexOutputFormat extends FileOutputFormat<Text,HadoopDirectory>{
   public RecordWriter<Text,HadoopDirectory> 
          getRecordWriter(TaskAttemptContext context
                          ) throws IOException, InterruptedException {
     Configuration conf = context.getConfiguration();
     Path file = getDefaultWorkFile(context, "");
     FileSystem fs = file.getFileSystem(conf);
     final SequenceIndex.Writer out = SequenceIndex.create(fs, conf, file, conf.getInt("io.file.buffer.size", 4096),  fs.getDefaultReplication(), fs.getDefaultBlockSize(), null, new Metadata());
     return new RecordWriter<Text,HadoopDirectory>() {
         public void write(Text key, HadoopDirectory value)
           throws IOException {
           out.append(key, value.getDir());
         }
         public void close(TaskAttemptContext context) throws IOException { 
           out.close();
         }
       };
   }
}

抱歉!评论已关闭.