现在的位置: 首页 > 综合 > 正文

Hadoop基本使用

2018年02月20日 ⁄ 综合 ⁄ 共 4084字 ⁄ 字号 评论关闭

1、删除output路径所在的文件

HadoopUtil.delete(conf, output);

2、获取HDFS上某一路径下的所有文件

2.1 方法1

FileSystem hdfs = null;
try {
//hdfs = FileSystem.get(URI.create("hdfs://localhost:9000/"),conf);
hdfs = FileSystem.get(conf);
} catch (Exception e) {
// TODO Auto-generated catch block
log.info("Create HDFS File System failed.");
e.printStackTrace();
}
//Path path = hdfs.getHomeDirectory();
FileStatus[] srcFileStatus = null;

try {
srcFileStatus = hdfs.listStatus(new Path(clusteredPointsPath));
} catch (Exception e) {
// TODO Auto-generated catch block
log.info("Get HDFS status failed. ");
e.printStackTrace();
}  
        Path[] srcFilePath = FileUtil.stat2Paths(srcFileStatus); 
        List<String> fileNames = new ArrayList<String>();
        for (int i = 0; i < srcFilePath.length; i++)  
        {  
            String srcFile = srcFilePath[i].toString();  
            int fileNamePosi = srcFile.lastIndexOf('/');  
            String fileName = srcFile.substring(fileNamePosi + 1);
            if(fileName.contains("part-"))
            {
            fileNames.add(srcFile);
            }
        }  
 

2.2 方法2

FileSystem fs = FileSystem.get(output.toUri(), conf);
    HadoopUtil.delete(conf, output);
    Path outFile = new Path(output, "part-randomSeed");
    boolean newFile = fs.createNewFile(outFile);
    if (newFile)
{
      Path inputPathPattern;
      if (fs.getFileStatus(input).isDir())
{
        inputPathPattern = new Path(input, "*");
      } else {
        inputPathPattern = input;
      }
      
      FileStatus[] inputFiles = fs.globStatus(inputPathPattern, PathFilters.logsCRCFilter());
      SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, outFile, Text.class, Cluster.class);
      Random random = RandomUtils.getRandom();
      List<Text> chosenTexts = Lists.newArrayListWithCapacity(k);
      List<Cluster> chosenClusters = Lists.newArrayListWithCapacity(k);
      int nextClusterId = 0;
      
      for (FileStatus fileStatus : inputFiles)

{

fileStatus处理        
      }

3、在MapReduce外部传递信息到MapReduce里面

3.1 在调用作业之前:

Configuration conf = new Configuration();

conf.set("key","value");  //在外部设置key的值为value

3.2 在运行MapReduce作业时:

  先回忆一下。Mapper有setup(),map(),cleanup()和run()四个方法。其中setup()一般是用来进行一些map()前的准备工作,map()则一般承担主要的处理工作,cleanup()则是收尾工作如关闭文件或者执行map()后的K-V分发等。run()方法提供了setup->map->cleanup()的执行模板。

  在MapReduce中,Mapper从一个输入分片中读取数据,然后经过Shuffle and Sort阶段,分发数据给Reducer,在Map端和Reduce端我们可能使用设置的Combiner进行合并,这在Reduce前进行。Partitioner控制每个K-V对应该被分发到哪个reducer[我们的Job可能有多个reducer],Hadoop默认使用HashPartitioner,HashPartitioner使用key的hashCode对reducer的数量取模得来。

  1. public void run(Context context) throws IOException, InterruptedException {  
  2.   setup(context);  
  3.   while (context.nextKeyValue()) {  
  4.     map(context.getCurrentKey(), context.getCurrentValue(), context);  
  5.   }  
  6.   cleanup(context);  
  7. }  

从上面run方法可以看出,K/V对是从传入的Context获取的。我们也可以从下面的map方法看出,输出结果K/V对也是通过Context来完成的。至于Context暂且放着。

  1. @SuppressWarnings("unchecked")  
  2. protected void map(KEYIN key, VALUEIN value,   
  3.                    Context context) throws IOException, InterruptedException {  
  4.   context.write((KEYOUT) key, (VALUEOUT) value);  
  5. }   

所以在map内部为了获得某些信息可以重写setup()在该方法中设置外部使用的参数,如下:

@Override
  protected void setup(Context context) throws IOException, InterruptedException {
    super.setup(context);
    Configuration conf = context.getConfiguration();
    String clusterPath = conf.get("path");
....   
  }

4、如果将多个小文件放在一个文件夹里file中,给mapreduce作业传递输入路径时,只需要传递到*/file即可(*代表上层路径),Hadoop会自动查找该文件夹下的文件,如果文件大小小于HDFS文件块大小,map的数量由文件个数确定;如果文件大小大于HDFS文件块大小,则map数量由文件大小除以HDFS文件块大小后的值向右取整数。

5、向HDFS文件系统写入数据

Configuration conf = context.getConfiguration();
String outputPath = conf.get("outputDataString");
HadoopUtil.delete(conf, new Path(outputPath));

    FileSystem fs = FileSystem.get(conf);
    FSDataOutputStream hdfsOutStream = fs.create(new Path(outputPath));
    BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(hdfsOutStream));
    bufferedWriter.write(confusionMatrix.toString());
    bufferedWriter.close();

5、有时关闭Hadoop守护进程(stop-all.sh),下次再启动(start-all.sh)后,之前所有的HDFS文件都不能使用,需要重新格式化系统,因为每次重启系统时临时文件被删除了,做如下处理即可:在conf文件夹的core-site.xml中增加如下属性

<property>
         <name>hadoop.tmp.dir</name>
         <value>/home/zhongchao/hadooptmp</value>
         <description>A base for other temporary directories</description>
     </property>

6、Hadoop集群调优

http://blog.csdn.net/dajuezhao/article/details/6591034

7、Hadoop运行内存溢出解决方法

http://blog.sina.com.cn/s/blog_8c6d7ff6010101sr.html

抱歉!评论已关闭.