现在的位置: 首页 > 综合 > 正文

Hadoop全排序中的Sampler采样器

2019年04月17日 ⁄ 综合 ⁄ 共 6065字 ⁄ 字号 评论关闭

        在Partitioner组件的设计与实现中,我们已经了解过Partitioner组件的其中一个和全排序相关的实现类——TotalOrderPartitioner。 

        我们知道,在Hadoop中,最终的处理结果集中的数据,除非就由一个Reduce Task处理,否则结果数据集只是局部有序而非全排序。 
        这节我们来学习在Hadoop中进行全排序操作中除了TotalOrderPartitioner之外的另一个组件——采样器Sampler。 
        在新版本的Hadoop中,内置了三个采样器: SplitSampler,RandomSampler和IntervalSampler这三个采样器都是InputSampler类的静态内部类,并且都实现了InputSampler类的内部接口Sampler,涉及的相关代码如下:

Java代码  收藏代码
  1. /** 
  2.  * Utility for collecting samples and writing a partition file for 
  3.  * {@link org.apache.hadoop.mapred.lib.TotalOrderPartitioner}. 
  4.  */  
  5. public class InputSampler<K,V> implements Tool {  
  6.   
  7.   ...  
  8.   
  9.   /** 
  10.    *采样器接口 
  11.    */  
  12.   public interface Sampler<K,V> {  
  13.     /** 
  14.      * 从输入数据几种获得一个数据采样的子集,然后通过这些采样数据在Map端由 
  15.      * TotalOrderPartitioner对处理数据做hash分组,以保证不同Reduce处理数据的有序性。 
  16.      * 该方法的具体采样逻辑由继承类实现。 
  17.      * For a given job, collect and return a subset of the keys from the 
  18.      * input data. 
  19.      */  
  20.     K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException;  
  21.   }  
  22.   
  23.   /** 
  24.    * 分片数据采样器,即从N个分片中采样,效率最高 
  25.    * Samples the first n records from s splits. 
  26.    * Inexpensive way to sample random data. 
  27.    */  
  28.   public static class SplitSampler<K,V> implements Sampler<K,V> {  
  29.      ...  
  30.   }  
  31.   
  32.   /** 
  33.    * 通用的随机数据采样器,按一定的频率对所有数据做随机采样,效率很低 
  34.    * Sample from random points in the input. 
  35.    * General-purpose sampler. Takes numSamples / maxSplitsSampled inputs from 
  36.    * each split. 
  37.    */  
  38.   public static class RandomSampler<K,V> implements Sampler<K,V> {  
  39.     ...  
  40.   }  
  41.   
  42.   /** 
  43.    * 有固定采样间隔的数据采样器,适合有序的数据集,效率较随机数据采样器要好一些 
  44.    * Sample from s splits at regular intervals. 
  45.    * Useful for sorted data. 
  46.    */  
  47.   public static class IntervalSampler<K,V> implements Sampler<K,V> {  
  48.      ...  
  49.   }  
  50.   
  51.   ...  
  52.   
  53. }  
  54.           


        从上面的代码及注释中我们可以了解该采样器是如何对数据采样的。对于每一个分区,读取一条记录,将这条记录添加到样本集合中,如果当前样本数大于当前的采样分区所需要的样本数,则停止对这个分区的采样。如此循环遍历完这个分区的所有记录。 
        
        SplitSampler 
        我们首先着重来看一下SplitSampler采样器是如何对数据采样的,先看其取样处理逻辑代码:

Java代码  收藏代码
  1. /** 
  2.    * Samples the first n records from s splits. 
  3.    * Inexpensive way to sample random data. 
  4.    */  
  5.   public static class SplitSampler<K,V> implements Sampler<K,V> {  
  6.       
  7.     ...  
  8.   
  9.     /** 
  10.      * From each split sampled, take the first numSamples / numSplits records. 
  11.      */  
  12.     @SuppressWarnings("unchecked"// ArrayList::toArray doesn't preserve type  
  13.     public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {  
  14.       //通过InputFormat组件读取所有的分片信息,之前在InputFormat组件的学习中已学习过  
  15.       InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());  
  16.       ArrayList<K> samples = new ArrayList<K>(numSamples);  
  17.       //获得采样分区数,在最大采样数最大分区数和总分区数中选择较小的  
  18.       int splitsToSample = Math.min(maxSplitsSampled, splits.length);  
  19.       //获取采样分区间隔  
  20.       int splitStep = splits.length / splitsToSample;  
  21.       //计算获取每个分区的采样数  
  22.       int samplesPerSplit = numSamples / splitsToSample;  
  23.       long records = 0;  
  24.       for (int i = 0; i < splitsToSample; ++i) {  
  25.         //获取第(i * splitStep)分片的RecordReader对象,并由该对象解析将数据解析成key/value  
  26.         RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],  
  27.             job, Reporter.NULL);  
  28.         K key = reader.createKey();  
  29.         V value = reader.createValue();  
  30.         while (reader.next(key, value)) {//向采样的空key和value中读入数据  
  31.           //将采样的key加入samples数组  
  32.           samples.add(key);  
  33.           key = reader.createKey();  
  34.           ++records;  
  35.           if ((i+1) * samplesPerSplit <= records) {//判断是否满足采样数  
  36.             break;  
  37.           }  
  38.         }  
  39.         reader.close();  
  40.       }  
  41.       //返回采样的key的数组,供TotalOrderPartitioner使用  
  42.       return (K[])samples.toArray();  
  43.     }  
  44.   }  
  45.           


        IntervalSampler 
        再来看一下IntervalSampler采样器是如何来对数据采样的:

Java代码  收藏代码
  1. public static class IntervalSampler<K,V> implements Sampler<K,V> {  
  2.       
  3.     ...  
  4.   
  5.     /** 
  6.      * 根据一定的间隔从s个分区中采样数据,非常适合对排好序的数据采样 
  7.      * For each split sampled, emit when the ratio of the number of records 
  8.      * retained to the total record count is less than the specified 
  9.      * frequency. 
  10.      */  
  11.     @SuppressWarnings("unchecked"// ArrayList::toArray doesn't preserve type  
  12.     public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {  
  13.       //通过InputFormat组件读取所有的分片信息  
  14.       InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());  
  15.       ArrayList<K> samples = new ArrayList<K>();  
  16.       //获得采样分区数,在最大采样数最大分区数和总分区数中选择较小的  
  17.       int splitsToSample = Math.min(maxSplitsSampled, splits.length);  
  18.       //获取采样分区间隔  
  19.       int splitStep = splits.length / splitsToSample;  
  20.       long records = 0;  
  21.       long kept = 0;  
  22.       for (int i = 0; i < splitsToSample; ++i) {  
  23.         //获取第(i * splitStep)分片的RecordReader对象,并由该对象解析将数据解析成key/value  
  24.         RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],  
  25.             job, Reporter.NULL);  
  26.         K key = reader.createKey();  
  27.         V value = reader.createValue();  
  28.         while (reader.next(key, value)) {//向采样的空key和value中读入数据  
  29.           ++records;  
  30.           if ((double) kept / records < freq) {//判断当前样本数与已经读取的记录数的比值小于freq  
  31.             ++kept;  
  32.             samples.add(key);  
  33.             key = reader.createKey();  
  34.           }  
  35.         }  
  36.         reader.close();  
  37.       }  
  38.       //返回采样的key的数组,供TotalOrderPartitioner使用  
  39.       return (K[])samples.toArray();  
  40.     }  
  41.   }  
  42.           


        从上面的代码可以看到,该采样器和SplitSampler采样器很像。对于每一个分区,读取一条记录,如果当前样本数与已经读取的记录数的比值小于freq,则将这条记录添加到样本集合,否则读取下一条记录。这样依次循环遍历完这个分区的所有记录。 

        RandomSampler 
        RandomSampler是一个随机数据采样器,效率最低,其采样过程是这样处理的: 
        首先通过InputFormat的getSplits方法得到所有的输入分区;然后确定需要抽样扫描的分区数目,取输入分区总数与用户输入的maxSplitsSampled两者的较小的值得到splitsToSample;然后对输入分区数组shuffle排序,打乱其原始顺序;然后循环逐个扫描每个分区中的记录进行采样,循环的条件是当前已经扫描的分区数小于splitsToSample或者当前已经扫描的分区数超过了splitsToSample但是小于输入分区总数并且当前的采样数小于最大采样数numSamples。 
        每个分区中记录采样的具体过程如下: 
从指定分区中取出一条记录,判断得到的随机浮点数是否小于等于采样频率freq,如果大于则放弃这条记录,然后判断当前的采样数是否小于最大采样数,如果小于则这条记录被选中,被放进采样集合中,否则从【0,numSamples】中选择一个随机数,如果这个随机数不等于最大采样数numSamples,则用这条记录替换掉采样集合随机数对应位置的记录,同时采样频率freq减小变为freq*(numSamples-1)/numSamples。然后依次遍历分区中的其它记录。 


        下面是几个采样器之间的比较: 
 

        当然,如果Hadoop内置的采样器不满足用户需求,那么用户可以完全编写自定义的采样器。 

       

   转载自http://flyingdutchman.iteye.com/blog/1878962

抱歉!评论已关闭.