亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb

首頁 > 學院 > 開發設計 > 正文

Hadoop mapreduce自定義分組RawComparator

2019-11-14 22:41:39
字體:
來源:轉載
供稿:網友
Hadoop maPReduce自定義分組RawComparator

本文發表于本人博客。

今天接著上次【Hadoop mapreduce自定義排序WritableComparable】文章寫,按照順序那么這次應該是講解自定義分組如何實現,關于操作順序在這里不多說了,需要了解的可以看看我在博客園的評論,現在開始。

首先我們查看下Job這個類,發現有setGroupingComparatorClass()這個方法,具體源碼如下:

  /**   * Define the comparator that controls which keys are grouped together   * for a single call to    * {@link Reducer#reduce(Object, Iterable,    *                       org.apache.hadoop.mapreduce.Reducer.Context)}   * @param cls the raw comparator to use   * @throws IllegalStateException if the job is submitted   */  public void setGroupingComparatorClass(Class<? extends RawComparator> cls                                         ) throws IllegalStateException {    ensureState(JobState.DEFINE);    conf.setOutputValueGroupingComparator(cls);  }

從方法的源碼可以看出這個方法是定義自定義鍵分組功能。設置這個自定義分組類必須滿足extends RawComparator,那我們可以看下這個類的源碼:

/** * <p> * A {@link Comparator} that Operates directly on byte representations of * objects. * </p> * @param <T> * @see DeserializerComparator */public interface RawComparator<T> extends Comparator<T> {  public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);}

然而這個RawComparator是泛型繼承Comparator接口的,簡單看了下那我們來自定義一個類繼承RawComparator,代碼如下:

public class MyGrouper implements RawComparator<SortAPI> {    @Override    public int compare(SortAPI o1, SortAPI o2) {        return (int)(o1.first - o2.first);    }    @Override    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {        int compareBytes = WritableComparator.compareBytes(b1, s1, 8, b2, s2, 8);        return compareBytes;    }    }

源碼中SortAPI是上節自定義排序中的定義對象,第一個方法從注釋可以看出是比較2個參數的大小,返回的是自然整數;第二個方法是在反序列化時比較,所以需要是用字節比較。接下來我們繼續看看自定義MyMapper類:

public class MyMapper extends Mapper<LongWritable, Text, SortAPI, LongWritable> {        @Override    protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException {        String[] splied = value.toString().split("/t");        try {            long first = Long.parseLong(splied[0]);            long second = Long.parseLong(splied[1]);            context.write(new SortAPI(first,second), new LongWritable(1));        } catch (Exception e) {            System.out.println(e.getMessage());        }    }    }

自定義MyReduce類:

public class MyReduce extends Reducer<SortAPI, LongWritable, LongWritable, LongWritable> {    @Override    protected void reduce(SortAPI key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {        context.write(new LongWritable(key.first), new LongWritable(key.second));    }    }

自定義SortAPI類:

public class SortAPI implements WritableComparable<SortAPI> {    public Long first;    public Long second;    public SortAPI(){            }    public SortAPI(long first,long second){        this.first = first;        this.second = second;    }    @Override    public int compareTo(SortAPI o) {        return (int) (this.first - o.first);    }    @Override    public void write(DataOutput out) throws IOException {        out.writeLong(first);        out.writeLong(second);    }    @Override    public void readFields(DataInput in) throws IOException {        this.first = in.readLong();        this.second = in.readLong();            }    @Override    public int hashCode() {        return this.first.hashCode() + this.second.hashCode();    }    @Override    public boolean equals(Object obj) {        if(obj instanceof SortAPI){            SortAPI o = (SortAPI)obj;            return this.first == o.first && this.second == o.second;        }        return false;    }        @Override    public String toString() {        return "輸出:" + this.first + ";" + this.second;    }    }

接下來準備數據,數據如下:

1       21       13       03       22       21       2

上傳至hdfs://hadoop-master:9000/grouper/input/test.txt,main代碼如下:

public class Test {    static final String OUTPUT_DIR = "hdfs://hadoop-master:9000/grouper/output/";    static final String INPUT_DIR = "hdfs://hadoop-master:9000/grouper/input/test.txt";    public static void main(String[] args) throws Exception {        Configuration conf = new Configuration();        Job job = new Job(conf, Test.class.getSimpleName());            job.setJarByClass(Test.class);        deleteOutputFile(OUTPUT_DIR);        //1設置輸入目錄        FileInputFormat.setInputPaths(job, INPUT_DIR);        //2設置輸入格式化類        job.setInputFormatClass(TextInputFormat.class);        //3設置自定義Mapper以及鍵值類型        job.setMapperClass(MyMapper.class);        job.setMapOutputKeyClass(SortAPI.class);        job.setMapOutputValueClass(LongWritable.class);        //4分區        job.setPartitionerClass(HashPartitioner.class);        job.setNumReduceTasks(1);        //5排序分組        job.setGroupingComparatorClass(MyGrouper.class);        //6設置在一定Reduce以及鍵值類型        job.setReducerClass(MyReduce.class);        job.setOutputKeyClass(LongWritable.class);        job.setOutputValueClass(LongWritable.class);        //7設置輸出目錄        FileOutputFormat.setOutputPath(job, new Path(OUTPUT_DIR));        //8提交job        job.waitForCompletion(true);    }        static void deleteOutputFile(String path) throws Exception{        Configuration conf = new Configuration();        FileSystem fs = FileSystem.get(new URI(INPUT_DIR),conf);        if(fs.exists(new Path(path))){            fs.delete(new Path(path));        }    }}

執行代碼,然后在節點上用終端輸入:hadoop fs -text /grouper/output/part-r-00000查看結果:

1       22       23       0

接下來我們修改下SortAPI類的compareTo()方法:

    @Override    public int compareTo(SortAPI o) {        long mis = (this.first - o.first) * -1;        if(mis != 0 ){            return (int)mis;        }        else{            return (int)(this.second - o.second);        }    }

再次執行并查看/grouper/output/part-r-00000文件:

3       02       21       1

這樣我們就得出了同樣的數據分組結果會受到排序算法的影響,比如排序是倒序那么分組也是先按照倒序數據源進行分組輸出。我們還可以在map函數以及reduce函數中打印記錄(過程省略)這樣經過對比也得出分組階段:鍵值對中key相同(即compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2)方法返回0)的則為一組,當前組再按照順序選擇第一個往緩沖區輸出(也許會存儲到硬盤)。其它的相同key的鍵值對就不會再往緩沖區輸出了。在百度上檢索到這邊文章,其中它的分組是把map函數輸出的value全部迭代到同一個key中,就相當于上面{key,value}:{1,{2,1,2}},這個結果跟最開始沒有自定義分組時是一樣的,我們可以在reduce函數輸出Iterable<LongWritable> values進行查看,其實我覺得這樣的才算是分組吧就像數據查詢一樣。

在這里我們應該要弄懂分組與分區的區別。分區是對輸出結果文件進行分類拆分文件以便更好查看,比如一個輸出文件包含所有狀態的http請求,那么為了方便查看通過分區把請求狀態分成幾個結果文件。分組就是把一些相同鍵的鍵值對進行計算減少輸出;分區之后數據全部還是照樣輸出到reduce端,而分組的話就有所減少了;當然這2個步驟也是不同的階段執行。

這次先到這里。堅持記錄點點滴滴!


發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb
尤物精品国产第一福利三区| 911国产网站尤物在线观看| 91精品久久久久久久久久另类| 亚洲高清不卡av| 日本一区二三区好的精华液| 欧美精品videosex性欧美| 国产精品九九久久久久久久| 亚洲第一黄色网| 国产精品视频网站| 欧美日韩在线一区| 粉嫩av一区二区三区免费野| 亚洲精品日韩丝袜精品| 91免费在线视频| 日韩精品免费观看| 91午夜理伦私人影院| 永久免费精品影视网站| 久久婷婷国产麻豆91天堂| 国产午夜精品一区理论片飘花| 色综久久综合桃花网| 久久天天躁夜夜躁狠狠躁2022| 久久人人爽人人爽人人片av高清| 日韩禁在线播放| 久久天堂av综合合色| 久久人91精品久久久久久不卡| 91久久在线播放| 91亚洲精品一区| 92福利视频午夜1000合集在线观看| 富二代精品短视频| 成人妇女淫片aaaa视频| 欧美疯狂xxxx大交乱88av| 国产精品视频午夜| 日韩在线免费高清视频| 自拍偷拍亚洲在线| 国产91在线播放| 日韩在线精品一区| 亚洲精品久久7777777| 欧美中文在线免费| 国产激情久久久久| 精品久久久久久| 综合欧美国产视频二区| 日本精品免费一区二区三区| 丁香五六月婷婷久久激情| 97视频在线观看亚洲| 夜夜嗨av一区二区三区四区| 在线观看中文字幕亚洲| 亚洲aⅴ日韩av电影在线观看| 国产亚洲免费的视频看| 欧美激情久久久久| 欧美精品福利视频| 亚洲人成网站免费播放| 亚洲精品久久久久| 久久久精品影院| 国产一区在线播放| 国产精品伦子伦免费视频| 国产91成人video| 亚洲激情在线观看视频免费| 日本成人激情视频| 亚洲aⅴ日韩av电影在线观看| 韩国v欧美v日本v亚洲| 亚洲xxx大片| 欧美国产日韩中文字幕在线| 国产精品黄色av| 2019av中文字幕| 欧美在线精品免播放器视频| 欧美激情网友自拍| 亚洲老头同性xxxxx| 日韩综合视频在线观看| 久久91超碰青草是什么| 深夜福利91大全| 18久久久久久| 最新国产精品亚洲| 国产精品成人观看视频国产奇米| 日韩美女免费视频| 久久天天躁狠狠躁夜夜av| 国产精品久久久| 亚洲视频欧美视频| 伊人久久久久久久久久久| 欧美激情高清视频| 欧美性少妇18aaaa视频| 欧美激情在线观看| 久久久久久网址| 成人精品视频99在线观看免费| 91产国在线观看动作片喷水| 91久久在线视频| 日韩免费av片在线观看| 国产亚洲综合久久| 91精品啪aⅴ在线观看国产| 欧美精品久久久久久久久| 国内精品久久久久久久久| 亚洲va码欧洲m码| 91高清视频免费观看| 伦理中文字幕亚洲| 欧美专区中文字幕| 日韩av三级在线观看| 欧美性猛交xxx| 久久精品成人一区二区三区| 久久97精品久久久久久久不卡| 亚洲国产成人爱av在线播放| 欧美午夜电影在线| 亚洲人精选亚洲人成在线| 日韩电影在线观看中文字幕| 日韩在线免费观看视频| 亚洲第一在线视频| 亚洲xxxx18| 欧美成人剧情片在线观看| 亚洲成人免费网站| 日韩av免费看网站| 亚洲一区二区三| 国产成人a亚洲精品| 亚洲国产精品美女| 亚洲欧美日韩在线一区| 日韩的一区二区| 亚洲人午夜色婷婷| 国产一区二区三区三区在线观看| 亚洲欧美日韩一区二区在线| 日韩在线一区二区三区免费视频| 中文字幕精品一区二区精品| 国产精品欧美激情在线播放| 亚洲国产精品va在看黑人| 久久久在线免费观看| 欧美亚洲伦理www| 精品在线小视频| www.欧美三级电影.com| 精品人伦一区二区三区蜜桃免费| 亚洲a在线播放| 欧美一性一乱一交一视频| 日韩免费电影在线观看| 久久6免费高清热精品| 亚洲日本中文字幕| 欧美激情精品久久久久久久变态| 国产精品吊钟奶在线| 91av在线免费观看视频| 欧美成人午夜剧场免费观看| 欧美色播在线播放| 91老司机在线| 国语自产精品视频在线看一大j8| 一区二区三区视频免费在线观看| 影音先锋欧美在线资源| 国产视频精品免费播放| 一本大道香蕉久在线播放29| 91麻豆国产精品| 精品国产31久久久久久| 日韩美女视频免费在线观看| 国产精品久久久久久久久影视| 97国产精品视频人人做人人爱| 欧美国产极速在线| 国产成人福利网站| 国产成+人+综合+亚洲欧美丁香花| 国产不卡av在线免费观看| 欧美日韩国产黄| 国产精品日韩精品| 欧美人成在线视频| 日韩精品电影网| 97在线视频一区| 久久久久久久久久国产| 亚洲一区亚洲二区亚洲三区| 国产视频综合在线| 国产一区二区三区毛片| 精品久久国产精品| 日韩免费av片在线观看| 欧美高清视频在线播放| 性欧美暴力猛交69hd| 亚洲精品456在线播放狼人| 国产精品扒开腿做爽爽爽的视频|