现在的位置: 首页 > 综合 > 正文

Java API操作HDFS

2018年06月05日 ⁄ 综合 ⁄ 共 5012字 ⁄ 字号 评论关闭

HDFS是存储数据的分布式文件系统,对HDFS的操作,就是对文件系统的操作,除了用HDFS的shell命令对文件系统进行操作,我们也可以利用Java API对文件系统进行操作,比如文件的创建、删除、修改权限等等,还有文件夹的创建、删除、重命名等等。

使用Java API对文件系统进行操作主要涉及以下几个类:

1.Configuration类:该类的对象封装了客户端或者服务端的配置。

2.FileSystem类:该类的对象是一个文件系统对象,可以利用该对象的一些方法来对文件进行操作,FileSystem是一个抽象,不能通过new来获取对象,应该通过Filesystem的静态方法get()来获取:

//创建URI对象
			//PATH = "hdfs://liaozhongmin:9000/";
			URI uri = new URI(PATH);
			//获取文件系统
			FileSystem fileSystem = FileSystem.get(uri, new Configuration());

3.FSDataInputStream和FSDataOutputStream:这两个类是HDFS中的输入输出流。分别通过FileSystem的open()方法和create()方法获得。

在使用Java API 操作HDFS之前,首先要保证hadoop是正常启动的,可以通过jps命令来查看hadoop的进程是否全部启动,如下图:

注:如上图所示,使用jps命令可以看到Hadoop的五个进程即NameNode、DataNode、SecondaryNameNode、TaskTrackers、JobTracker启动了,就表示hadoop启动成功了。

下面是关于Java API操作HDFS文件系统的常用工具类:

public class FileSystemUtil {

	//定义HDFS的路径
	private static final String PATH = "hdfs://liaozhongmin:9000/";
	
	/**
	 * 获取文件系统FileSystem
	 * @return
	 */
	public static FileSystem getFileSystem(){
		
		try {
			//创建URI对象
			//PATH = "hdfs://liaozhongmin:9000/";
			URI uri = new URI(PATH);
			//获取文件系统
			FileSystem fileSystem = FileSystem.get(uri, new Configuration());
			return fileSystem;
		} catch (Exception e) {
			e.printStackTrace();
		}
		
		return null;
	}
	
	/**
	 * 创建文件夹
	 * @param fileSystem
	 */
	public static void mkdir(FileSystem fileSystem,String path){
		
		try {
			//创建Path对象
			Path srcPath = new Path(path);
			//通过文件系统对象创建文件夹
			fileSystem.mkdirs(srcPath);
		} catch (IOException e) {
			e.printStackTrace();
		} finally{
			try {
				fileSystem.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}
	
	/**
	 * 创建新文件
	 * @param dst
	 * @param contents
	 */
	public static void createFile(String dst,byte[] contents){
		
		//定义文件系统
		FileSystem fileSystem = null;
		//定义输出流
		FSDataOutputStream outputStream = null;
		try {
			//获取文件系统
			fileSystem = FileSystemUtil.getFileSystem();
			//创建一个目标路径
			Path path = new Path(dst);
			//创建文件
			outputStream = fileSystem.create(path);
			//写数据
			outputStream.write(contents);
			System.out.println("文件创建成功!");
		} catch (IOException e) {
			e.printStackTrace();
		} finally{
			//关闭
			try {
				fileSystem.close();
				outputStream.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
			
		}
		
	}
	/**
	 * 向文件系统中上传本地数据
	 * @param fileSystem
	 */
	public static void putData(FileSystem fileSystem,String src,String dst){
		
		try {
			//创建原路径Path对象
			Path srcPath = new Path(src);
			//创建目标路径Path对象
			Path dstPath = new Path(dst);
			//调用文件系统的文件爱你复制函数,前面的参数是指是否删除源文件,true为删除,否则不删除
			fileSystem.copyFromLocalFile(false, srcPath, dstPath);
			
		} catch (IOException e) {
			e.printStackTrace();
		} finally{
			//关闭文件系统
			try {
				fileSystem.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}
	
	/**
	 * 文件重命名
	 * @param oldName 旧名字
	 * @param newName 新名字
	 */
	public static void rename(FileSystem fileSystem,String oldName,String newName){
		
		try {
			//创建旧文件的Path对象
			Path oldPath = new Path(oldName);
			Path newPath = new Path(newName);
			//重命名
			fileSystem.rename(oldPath, newPath);
		} catch (Exception e) {
			e.printStackTrace();
		} finally{
			try {
				fileSystem.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}
	/**
	 * 下载文件
	 * @param fileSystem
	 * @param fileName
	 */
	public static void getData(FileSystem fileSystem,String src,String dst){
		
		
		try {
			//创建Path对象
			Path path = new Path(src);
			//通过path构建文件系统的输入流
			FSDataInputStream in = fileSystem.open(path);
			//构建文件
			File file = new File(dst);
			
			if (!file.exists()){
				file.createNewFile();
			}
			//构建输出流
			FileOutputStream fileOutputStream = new FileOutputStream(file);
			//下载
			IOUtils.copyBytes(in, fileOutputStream, 1024, true);
		} catch (IOException e) {
			
			e.printStackTrace();
		}
	}
	
	/**
	 * 遍历文件系统中的某个目录
	 * @param fileSystem
	 */
	public static void listFile(FileSystem fileSystem,String path){
		
		try {
			//调用listStatus()方法获取一个文件数组
			FileStatus[] listStatus = fileSystem.listStatus(new Path(path));
			//循环遍历
			for (FileStatus fileStatus : listStatus){
				//判断是否为目录
				String isDir = fileStatus.isDir()?"是文件夹":"是文件";
				//获取文件的权限
				String permission = fileStatus.getPermission().toString();
				//获取备份
				short replication = fileStatus.getReplication();
				//获取数组的长度
				long len = fileStatus.getLen();
				//获取文件的路径
				String filePath = fileStatus.getPath().toString();
				//打印数据
				System.out.println("isDir:" +isDir + "\npermission:" + permission + "\nreplication:" + replication+ "\nlen:" + len + "\nfilepath:" + filePath);
			}
		} catch (IOException e) {
			
			e.printStackTrace();
		}
	}
	
	/**
	 * 删除文件
	 * @param fileSystem
	 */
	public static void remove(FileSystem fileSystem,String filePath){
		
		try {
			//创建path对象
			Path path = new Path(filePath);
			//通过文件系统进行删除,第二个参数设置为true时才可以删除为目录的文件,否则的话如果是目录而不是文件就会报错的
			fileSystem.delete(path, true);
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	
	/**
	 * 工具类的测试
	 * @param args
	 */
	public static void main(String[] args) {
		
		//创建文件系统
		FileSystem fileSystem = getFileSystem();
		
		//创建文件夹
		//FileSystemUtil.mkdir(fileSystem, "/myDir");
		
		//创建文件并写入内容
		//byte[] contents = "我爱你".getBytes();
		//FileSystemUtil.createFile("/myDir/liao.txt", contents);
		
		//文件重命名
		//FileSystemUtil.rename(fileSystem, "/myDir/liao.txt", "/myDir/liao_bak");
		
		//上传文件
		//FileSystemUtil.putData(fileSystem, "D:\\Android\\test.txt", "/myDir");
		
		//下载文件
		//FileSystemUtil.getData(fileSystem, "/myDir/test.txt", "D:\\test.txt");
		
		//遍历文件系统的某个目录
		//FileSystemUtil.listFile(fileSystem, "/");
		
		//删除文件
		FileSystemUtil.remove(fileSystem, "/dir/liaozhongmin.txt");
	}
}

注:远程文件系统的路径,请自行在hadoop/conf/core-site.xml文件中配置:

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<!-- Put site-specific property overrides in this file. -->

<configuration>
	 <property>
	         <name>fs.default.name</name>
		<!--文件系统的路径-->
	         <value>hdfs://liaozhongmin:9000</value>
	  </property>

	  <property>
	          <name>hadoop.tmp.dir</name>
		 <!--文件保存的位置(上传后以块的形式保存)-->
	          <value>/usr/local/hadoop/tmp</value>
	  </property>  

</configuration>

抱歉!评论已关闭.