import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.LinkedList; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTablePool; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.util.Bytes; /** * * @author eryk * */ public class HFileDeleteData { private FileSystem _fs; public static HTablePool pool = null; private LinkedList<Path> hfileList = new LinkedList<Path>(); public HFileDeleteData(FileSystem fs, String tableName) { this._fs = fs; _tableName = tableName; } private final String HBASE_DIR = "/hbase/"; private String _tableName; /** * * @param args * @throws IOException */ public static void main(String[] args) throws IOException { Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(conf); // XXX args HFileDeleteData hd = new HFileDeleteData(fs, args[2]); ExecutorService es = Executors.newFixedThreadPool(Integer.parseInt(args[1])); Calendar calendar = Calendar.getInstance(); calendar.add(Calendar.DAY_OF_MONTH, Integer.parseInt("-"+args[0])); String date = new SimpleDateFormat("yyyyMMdd").format(calendar.getTime()); System.out.println(date+" ---- "+calendar.getTimeInMillis()); Configuration hbase_conf = HBaseConfiguration.create(); HTable table = new HTable(hbase_conf, hd._tableName); HTableDescriptor desc = table.getTableDescriptor(); HColumnDescriptor[] hcd = desc.getColumnFamilies(); for (int i = 0; i < hcd.length; i++) { hd.fileList(hd.HBASE_DIR + hd._tableName, Bytes.toString(hcd[i].getName())); } System.out.println("total hfile:" + hd.hfileList.size()); HTable htable = getTable(hd._tableName); long time = calendar.getTimeInMillis(); for (Path path : hd.hfileList) { Path pathNew = path; es.execute(hd.new DelRow(fs, htable, pathNew, time)); } table.flushCommits(); table.close(); es.shutdown(); } // read region hile path to hfileList /** * * @param path * /hbase/tableName * @param cf * table * @throws IOException */ public void fileList(String path, String cf) throws IOException { FileStatus[] fileList = _fs.listStatus(new Path(path)); for (int i = 0; i < fileList.length; i++) { FileStatus[] tmpList = _fs.listStatus(new Path(fileList[i].getPath().toString() + "/" + cf)); for (int j = 0; j < tmpList.length; j++) { hfileList.add(tmpList[j].getPath()); } } } private static HTable getTable(String tableName) { Configuration conf = HBaseConfiguration.create(); if (pool == null) { pool = new HTablePool(conf, 500); } return (HTable) pool.getTable(tableName); } public class DelRow extends Thread { private long deadline; private HTable table; private Path path; private FileSystem fs; public DelRow(FileSystem fs, HTable table, Path hfile, long deadline) { this.fs = fs; this.table = table; this.path = hfile; this.deadline = deadline; } @Override public void run() { long count = 0; HFile.Reader hreader; try { hreader = new HFile.Reader(fs, path, null, false); hreader.loadFileInfo(); HFileScanner hscanner = hreader.getScanner(false, false); hscanner.seekTo(); while (hscanner.next()) { KeyValue kv = hscanner.getKeyValue(); if (kv.getTimestamp() < deadline || kv.getValueLength() == 0) { Delete d = new Delete(kv.getRow()); table.delete(d); count++; } } } catch (IOException e) { e.printStackTrace(); } System.out.println("total delete:" + count); } } }