现在的位置: 首页 > 综合 > 正文

读取HFile 按Timestamp 删除 HBase 数据

2018年04月09日 ⁄ 综合 ⁄ 共 3461字 ⁄ 字号 评论关闭
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.LinkedList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.util.Bytes;

/**
 * 
 * @author eryk
 *
 */
public class HFileDeleteData {

	private FileSystem _fs;

	public static HTablePool pool = null;

	private LinkedList<Path> hfileList = new LinkedList<Path>(); 

	public HFileDeleteData(FileSystem fs, String tableName) {
		this._fs = fs;
		_tableName = tableName;
	}

	private final String HBASE_DIR = "/hbase/";

	private String _tableName;

	/**
	 * 
	 * @param args
	 * @throws IOException
	 */
	public static void main(String[] args) throws IOException {

		Configuration conf = new Configuration();
		FileSystem fs = FileSystem.get(conf);

		// XXX args
		HFileDeleteData hd = new HFileDeleteData(fs, args[2]);

		ExecutorService es = Executors.newFixedThreadPool(Integer.parseInt(args[1]));

		Calendar calendar = Calendar.getInstance();
		calendar.add(Calendar.DAY_OF_MONTH, Integer.parseInt("-"+args[0]));

		String date = new SimpleDateFormat("yyyyMMdd").format(calendar.getTime());
		
		System.out.println(date+"	----	"+calendar.getTimeInMillis());
		
		Configuration hbase_conf = HBaseConfiguration.create();
		HTable table = new HTable(hbase_conf, hd._tableName);
		HTableDescriptor desc = table.getTableDescriptor();
		HColumnDescriptor[] hcd = desc.getColumnFamilies();
		for (int i = 0; i < hcd.length; i++) {
			hd.fileList(hd.HBASE_DIR + hd._tableName, Bytes.toString(hcd[i].getName()));
		}

		System.out.println("total hfile:" + hd.hfileList.size());
		HTable htable = getTable(hd._tableName);
		long time = calendar.getTimeInMillis();
		for (Path path : hd.hfileList) {
			Path pathNew = path;
			es.execute(hd.new DelRow(fs, htable, pathNew, time));
		}

		table.flushCommits();
		table.close();
		es.shutdown();
	}

	// read region hile path to hfileList
	/**
	 * 
	 * @param path
	 *            /hbase/tableName
	 * @param cf
	 *            table
	 * @throws IOException
	 */
	public void fileList(String path, String cf) throws IOException {
		FileStatus[] fileList = _fs.listStatus(new Path(path));
		for (int i = 0; i < fileList.length; i++) {
			FileStatus[] tmpList = _fs.listStatus(new Path(fileList[i].getPath().toString() + "/"
					+ cf));
			for (int j = 0; j < tmpList.length; j++) {
				hfileList.add(tmpList[j].getPath());
			}
		}
	}

	private static HTable getTable(String tableName) {
		Configuration conf = HBaseConfiguration.create();
		if (pool == null) {
			pool = new HTablePool(conf, 500);
		}
		return (HTable) pool.getTable(tableName);
	}

	public class DelRow extends Thread {

		private long deadline;
		private HTable table;
		private Path path;
		private FileSystem fs;

		public DelRow(FileSystem fs, HTable table, Path hfile, long deadline) {
			this.fs = fs;
			this.table = table;
			this.path = hfile;
			this.deadline = deadline;
		}

		@Override
		public void run() {
			long count = 0;
			HFile.Reader hreader;
			try {
				hreader = new HFile.Reader(fs, path, null, false);

				hreader.loadFileInfo();

				HFileScanner hscanner = hreader.getScanner(false, false);
				hscanner.seekTo();
				while (hscanner.next()) {
					KeyValue kv = hscanner.getKeyValue();
					if (kv.getTimestamp() < deadline || kv.getValueLength() == 0) {
						Delete d = new Delete(kv.getRow());
						table.delete(d);
						count++;
					}
				}
			} catch (IOException e) {
				e.printStackTrace();
			}
			System.out.println("total delete:" + count);
		}

	}
}

抱歉!评论已关闭.