亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb

首頁 > 數據庫 > MongoDB > 正文

通用MapReduce程序復制HBase表數據

2020-10-29 18:42:33
字體:
來源:轉載
供稿:網友

編寫MR程序,讓其可以適合大部分的HBase表數據導入到HBase表數據。其中包括可以設置版本數、可以設置輸入表的列導入設置(選取其中某幾列)、可以設置輸出表的列導出設置(選取其中某幾列)。

原始表test1數據如下:

每個row key都有兩個版本的數據,這里只顯示了row key為1的數據

 在hbase shell 中創建數據表:

create 'test2',{NAME => 'cf1',VERSIONS => 10}  // 保存無版本、無列導入設置、無列導出設置的數據create 'test3',{NAME => 'cf1',VERSIONS => 10}  // 保存無版本、無列導入設置、有列導出設置的數據create 'test4',{NAME => 'cf1',VERSIONS => 10}  // 保存無版本、有列導入設置、無列導出設置的數據create 'test5',{NAME => 'cf1',VERSIONS => 10}  // 保存有版本、無列導入設置、無列導出設置的數據create 'test6',{NAME => 'cf1',VERSIONS => 10}  // 保存有版本、無列導入設置、有列導出設置的數據create 'test7',{NAME => 'cf1',VERSIONS => 10}  // 保存有版本、有列導入設置、無列導出設置的數據create 'test8',{NAME => 'cf1',VERSIONS => 10}  // 保存有版本、有列導入設置、有列導出設置的數據

main函數入口:

package GeneralHBaseToHBase;import org.apache.hadoop.util.ToolRunner;public class DriverTest { public static void main(String[] args) throws Exception { // 無版本設置、無列導入設置,無列導出設置 String[] myArgs1= new String[]{ "test1", // 輸入表 "test2", // 輸出表 "0",  // 版本大小數,如果值為0,則為默認從輸入表導出最新的數據到輸出表 "-1", // 列導入設置,如果為-1 ,則沒有設置列導入 "-1" // 列導出設置,如果為-1,則沒有設置列導出 };  ToolRunner.run(HBaseDriver.getConfiguration(),  new HBaseDriver(), myArgs1); // 無版本設置、有列導入設置,無列導出設置 String[] myArgs2= new String[]{ "test1", "test3", "0", "cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14", "-1" }; ToolRunner.run(HBaseDriver.getConfiguration(),  new HBaseDriver(), myArgs2); // 無版本設置,無列導入設置,有列導出設置 String[] myArgs3= new String[]{ "test1", "test4", "0", "-1", "cf1:c1,cf1:c10,cf1:c14" }; ToolRunner.run(HBaseDriver.getConfiguration(),  new HBaseDriver(), myArgs3); // 有版本設置,無列導入設置,無列導出設置 String[] myArgs4= new String[]{ "test1", "test5", "2", "-1", "-1" }; ToolRunner.run(HBaseDriver.getConfiguration(),  new HBaseDriver(), myArgs4); // 有版本設置、有列導入設置,無列導出設置 String[] myArgs5= new String[]{ "test1", "test6", "2", "cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14", "-1" }; ToolRunner.run(HBaseDriver.getConfiguration(),  new HBaseDriver(), myArgs5);  // 有版本設置、無列導入設置,有列導出設置 String[] myArgs6= new String[]{ "test1", "test7", "2", "-1", "cf1:c1,cf1:c10,cf1:c14" }; ToolRunner.run(HBaseDriver.getConfiguration(),  new HBaseDriver(), myArgs6); // 有版本設置、有列導入設置,有列導出設置 String[] myArgs7= new String[]{ "test1", "test8", "2", "cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14", "cf1:c1,cf1:c10,cf1:c14" }; ToolRunner.run(HBaseDriver.getConfiguration(),  new HBaseDriver(), myArgs7); } }

driver:

package GeneralHBaseToHBase;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.client.Scan;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.util.Tool;import util.JarUtil;  public class HBaseDriver extends Configured implements Tool{ public static String FROMTABLE=""; //導入表 public static String TOTABLE=""; //導出表 public static String SETVERSION=""; //是否設置版本 // args => {FromTable,ToTable,SetVersion,ColumnFromTable,ColumnToTable} @Override public int run(String[] args) throws Exception { if(args.length!=5){ System.err.println("Usage:/n demo.job.HBaseDriver <input> <inputTable> "  + "<output> <outputTable>"  +"< versions >"  + " <set columns from inputTable> like <cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14> or <-1> "  + "<set columns from outputTable> like <cf1:c1,cf1:c10,cf1:c14> or <-1>"); return -1; } Configuration conf = getConf(); FROMTABLE = args[0]; TOTABLE = args[1]; SETVERSION = args[2]; conf.set("SETVERSION", SETVERSION); if(!args[3].equals("-1")){ conf.set("COLUMNFROMTABLE", args[3]); } if(!args[4].equals("-1")){ conf.set("COLUMNTOTABLE", args[4]); } String jobName ="From table "+FROMTABLE+ " ,Import to "+ TOTABLE; Job job = Job.getInstance(conf, jobName); job.setJarByClass(HBaseDriver.class); Scan scan = new Scan(); // 判斷是否需要設置版本 if(SETVERSION != "0" || SETVERSION != "1"){ scan.setMaxVersions(Integer.parseInt(SETVERSION)); } // 設置HBase表輸入:表名、scan、Mapper類、mapper輸出鍵類型、mapper輸出值類型 TableMapReduceUtil.initTableMapperJob( FROMTABLE,  scan,  HBaseToHBaseMapper.class,  ImmutableBytesWritable.class,  Put.class,  job); // 設置HBase表輸出:表名,reducer類 TableMapReduceUtil.initTableReducerJob(TOTABLE, null, job); // 沒有 reducers, 直接寫入到 輸出文件  job.setNumReduceTasks(0);   return job.waitForCompletion(true) ? 0 : 1;   } private static Configuration configuration; public static Configuration getConfiguration(){ if(configuration==null){ /** * TODO 了解如何直接從Windows提交代碼到Hadoop集群 *  并修改其中的配置為實際配置 */ configuration = new Configuration(); configuration.setBoolean("mapreduce.app-submission.cross-platform", true);// 配置使用跨平臺提交任務 configuration.set("fs.defaultFS", "hdfs://master:8020");// 指定namenode configuration.set("mapreduce.framework.name", "yarn"); // 指定使用yarn框架 configuration.set("yarn.resourcemanager.address", "master:8032"); // 指定resourcemanager configuration.set("yarn.resourcemanager.scheduler.address", "master:8030");// 指定資源分配器 configuration.set("mapreduce.jobhistory.address", "master:10020");// 指定historyserver configuration.set("hbase.master", "master:16000"); configuration.set("hbase.rootdir", "hdfs://master:8020/hbase"); configuration.set("hbase.zookeeper.quorum", "slave1,slave2,slave3"); configuration.set("hbase.zookeeper.property.clientPort", "2181"); //TODO 需export->jar file ; 設置正確的jar包所在位置 configuration.set("mapreduce.job.jar",JarUtil.jar(HBaseDriver.class));// 設置jar包路徑 }  return configuration; }  }

mapper:

package GeneralHBaseToHBase;import java.io.IOException;import java.util.ArrayList;import java.util.HashMap;import java.util.HashSet;import java.util.Map.Entry;import java.util.NavigableMap;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.Cell;import org.apache.hadoop.hbase.KeyValue;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.client.Result;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.mapreduce.TableMapper;import org.apache.hadoop.hbase.util.Bytes;import org.slf4j.Logger;import org.slf4j.LoggerFactory;public class HBaseToHBaseMapper extends TableMapper<ImmutableBytesWritable, Put> { Logger log = LoggerFactory.getLogger(HBaseToHBaseMapper.class); private static int versionNum = 0; private static String[] columnFromTable = null; private static String[] columnToTable = null; private static String column1 = null; private static String column2 = null; @Override protected void setup(Context context) throws IOException, InterruptedException { Configuration conf = context.getConfiguration(); versionNum = Integer.parseInt(conf.get("SETVERSION", "0")); column1 = conf.get("COLUMNFROMTABLE",null); if(!(column1 == null)){ columnFromTable = column1.split(","); } column2 = conf.get("COLUMNTOTABLE",null);  if(!(column2 == null)){ columnToTable = column2.split(","); } } @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { context.write(key, resultToPut(key,value)); }  /*** * 把key,value轉換為Put * @param key * @param value * @return * @throws IOException */ private Put resultToPut(ImmutableBytesWritable key, Result value) throws IOException { HashMap<String, String> fTableMap = new HashMap<>(); HashMap<String, String> tTableMap = new HashMap<>(); Put put = new Put(key.get()); if(! (columnFromTable == null || columnFromTable.length == 0)){ fTableMap = getFamilyAndColumn(columnFromTable); } if(! (columnToTable == null || columnToTable.length == 0)){ tTableMap = getFamilyAndColumn(columnToTable); } if(versionNum==0){       if(fTableMap.size() == 0){    if(tTableMap.size() == 0){   for (Cell kv : value.rawCells()) {  put.add(kv); // 沒有設置版本,沒有設置列導入,沒有設置列導出  }  return put; } else{  return getPut(put, value, tTableMap); // 無版本、無列導入、有列導出 } } else { if(tTableMap.size() == 0){  return getPut(put, value, fTableMap);// 無版本、有列導入、無列導出 } else {  return getPut(put, value, tTableMap);// 無版本、有列導入、有列導出 } } } else{ if(fTableMap.size() == 0){ if(tTableMap.size() == 0){  return getPut1(put, value); // 有版本,無列導入,無列導出 }else{  return getPut2(put, value, tTableMap); //有版本,無列導入,有列導出 } }else{ if(tTableMap.size() == 0){  return getPut2(put,value,fTableMap);// 有版本,有列導入,無列導出 }else{  return getPut2(put,value,tTableMap); // 有版本,有列導入,有列導出 } } } } /*** * 無版本設置的情況下,對于有列導入或者列導出 * @param put * @param value * @param tableMap * @return * @throws IOException */  private Put getPut(Put put,Result value,HashMap<String, String> tableMap) throws IOException{ for(Cell kv : value.rawCells()){ byte[] family = kv.getFamily(); if(tableMap.containsKey(new String(family))){ String columnStr = tableMap.get(new String(family)); ArrayList<String> columnBy = toByte(columnStr); if(columnBy.contains(new String(kv.getQualifier()))){  put.add(kv); //沒有設置版本,沒有設置列導入,有設置列導出 } } } return put; } /*** * (有版本,無列導入,有列導出)或者(有版本,有列導入,無列導出) * @param put * @param value * @param tTableMap * @return */ private Put getPut2(Put put,Result value,HashMap<String, String> tableMap){ NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> map=value.getMap();  for(byte[] family:map.keySet()){   if(tableMap.containsKey(new String(family))){   String columnStr = tableMap.get(new String(family));   log.info("@@@@@@@@@@@"+new String(family)+" "+columnStr); ArrayList<String> columnBy = toByte(columnStr);   NavigableMap<byte[], NavigableMap<Long, byte[]>> familyMap = map.get(family);//列簇作為key獲取其中的列相關數據    for(byte[] column:familyMap.keySet()){        //根據列名循壞     log.info("!!!!!!!!!!!"+new String(column));     if(columnBy.contains(new String(column))){     NavigableMap<Long, byte[]> valuesMap = familyMap.get(column);      for(Entry<Long, byte[]> s:valuesMap.entrySet()){//獲取列對應的不同版本數據,默認最新的一個      System.out.println("***:"+new String(family)+" "+new String(column)+" "+s.getKey()+" "+new String(s.getValue()));      put.addColumn(family, column, s.getKey(),s.getValue());      }     }    }   }     } return put;  } /*** * 有版本、無列導入、無列導出 * @param put * @param value * @return */ private Put getPut1(Put put,Result value){ NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> map=value.getMap();  for(byte[] family:map.keySet()){    NavigableMap<byte[], NavigableMap<Long, byte[]>> familyMap = map.get(family);//列簇作為key獲取其中的列相關數據   for(byte[] column:familyMap.keySet()){        //根據列名循壞    NavigableMap<Long, byte[]> valuesMap = familyMap.get(column);    for(Entry<Long, byte[]> s:valuesMap.entrySet()){    //獲取列對應的不同版本數據,默認最新的一個     put.addColumn(family, column, s.getKey(),s.getValue());    }   }  }  return put; } // str => {"cf1:c1","cf1:c2","cf1:c10","cf1:c11","cf1:c14"} /*** * 得到列簇名與列名的k,v形式的map * @param str => {"cf1:c1","cf1:c2","cf1:c10","cf1:c11","cf1:c14"} * @return map => {"cf1" => "c1,c2,c10,c11,c14"} */ private static HashMap<String, String> getFamilyAndColumn(String[] str){ HashMap<String, String> map = new HashMap<>(); HashSet<String> set = new HashSet<>(); for(String s : str){ set.add(s.split(":")[0]); } Object[] ob = set.toArray(); for(int i=0; i<ob.length;i++){ String family = String.valueOf(ob[i]); String columns = ""; for(int j=0;j < str.length;j++){ if(family.equals(str[j].split(":")[0])){  columns += str[j].split(":")[1]+","; } } map.put(family, columns.substring(0, columns.length()-1)); } return map;  }  private static ArrayList<String> toByte(String s){ ArrayList<String> b = new ArrayList<>(); String[] sarr = s.split(","); for(int i=0;i<sarr.length;i++){ b.add(sarr[i]); } return b; }}

程序運行完之后,在hbase shell中查看每個表,看是否數據導入正確:

test2:(無版本、無列導入設置、無列導出設置)

test3 (無版本、有列導入設置("cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14")、無列導出設置)

test4(無版本、無列導入設置、有列導出設置("cf1:c1,cf1:c10,cf1:c14"))

test5(有版本、無列導入設置、無列導出設置)

test6(有版本、有列導入設置("cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14")、無列導出設置)

test7(有版本、無列導入設置、有列導出設置("cf1:c1,cf1:c10,cf1:c14"))

test8(有版本、有列導入設置("cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14")、有列導出設置("cf1:c1,cf1:c10,cf1:c14"))

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持武林網。

發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb
欧美在线视频在线播放完整版免费观看| 欧亚精品在线观看| 日韩av手机在线看| 久久精彩免费视频| 国产精品久久激情| 亚洲一区国产精品| 日韩免费中文字幕| 欧美激情精品久久久久久变态| 欧美日韩不卡合集视频| 国产精自产拍久久久久久蜜| 久久久www成人免费精品| 国产成人精品a视频一区www| 欧美激情视频网站| 欧洲成人在线视频| 精品国产91久久久久久| 日韩美女视频免费在线观看| 日本高清视频一区| 日韩在线观看精品| xxxx性欧美| 午夜精品视频网站| 成人欧美一区二区三区黑人| 久久99精品视频一区97| 日韩成人在线电影网| 成人激情综合网| 在线观看亚洲区| 日韩欧美精品在线观看| 久久精品影视伊人网| 午夜精品久久久久久久白皮肤| 日韩性xxxx爱| 亚洲精品电影在线观看| 亚洲精品久久7777777| 久久中文字幕国产| 国产精品视频内| 91免费的视频在线播放| 国产精品精品一区二区三区午夜版| 日韩在线视频国产| 日产精品久久久一区二区福利| 国产成人一区二区三区小说| 欧美激情网站在线观看| 精品一区二区三区四区| 国产精品xxxxx| 成人a视频在线观看| 在线视频精品一| 成人激情在线观看| 国产成人涩涩涩视频在线观看| 亚洲人成毛片在线播放| 国产一区二区三区免费视频| 欧美激情欧美激情| 国产精品第七影院| 国产ts一区二区| 狠狠操狠狠色综合网| 日韩av一区在线观看| 狠狠爱在线视频一区| 欧美亚洲激情视频| 欧美大全免费观看电视剧大泉洋| 欧美精品日韩www.p站| 国产日韩精品视频| 91香蕉嫩草神马影院在线观看| 国产成人亚洲综合91精品| 久久99国产精品自在自在app| 亚洲免费影视第一页| 亚洲精品自产拍| 国产精品视频免费观看www| 性日韩欧美在线视频| 欧美日韩亚洲精品内裤| 一区二区欧美在线| 在线精品91av| 久久九九热免费视频| 国产精品稀缺呦系列在线| 亚洲黄色在线看| 久久亚洲精品国产亚洲老地址| 国自产精品手机在线观看视频| 久热精品视频在线观看| 久久精品中文字幕免费mv| 亚洲美女性生活视频| 黄色成人在线播放| 在线播放日韩av| 亚洲精品999| 欧美成人第一页| 国产精品久久久久久久久男| 91免费看国产| 91av视频在线播放| 亚洲国产精品99久久| 国产999在线| 国产精品美女久久久久久免费| 国产精品第一第二| 欧美大尺度在线观看| 国产欧美一区二区三区视频| 欧美做爰性生交视频| 欧美中文字幕视频| 亚洲欧洲中文天堂| 亚洲精品中文字幕av| 国产精品91一区| 欧美男插女视频| 日韩亚洲成人av在线| 成人精品久久av网站| 国产精品www| 亚洲色图18p| 色综合伊人色综合网| 国产日韩精品入口| 国产成人av在线播放| 国产精品久久91| 亚洲三级 欧美三级| 欧美在线视频网站| 亚洲成av人乱码色午夜| 国产一区二区黑人欧美xxxx| 韩国三级日本三级少妇99| 久久五月天色综合| 欧美猛少妇色xxxxx| 色婷婷**av毛片一区| 久久国产精品久久久久久久久久| 久久艳片www.17c.com| 国产精品h片在线播放| 国产精品美女久久久久av超清| 97香蕉超级碰碰久久免费软件| 成人啪啪免费看| 亚洲欧洲av一区二区| 91久久久亚洲精品| 欧美综合在线观看| 91欧美精品成人综合在线观看| 亚洲精品aⅴ中文字幕乱码| 日本不卡视频在线播放| 在线丨暗呦小u女国产精品| 色综合久久中文字幕综合网小说| 日韩电影免费观看中文字幕| 亚洲欧洲视频在线| 激情成人在线视频| 亚洲欧美一区二区激情| 国产日韩欧美另类| 亚洲白拍色综合图区| 亚洲乱码一区二区| 日本精品免费观看| 欧美极品少妇xxxxⅹ喷水| 日本高清视频精品| 色综久久综合桃花网| 亚洲激情视频网站| 欧美性xxxxxxx| 97精品国产91久久久久久| 国产精品无av码在线观看| 国产美女直播视频一区| 国内精品视频一区| 日本乱人伦a精品| 欧美日韩国产一区中文午夜| 久久久噜噜噜久噜久久| 精品国产一区二区三区在线观看| 国产91在线播放精品91| 欧美人在线观看| 国产精品一区=区| 亚洲美女在线视频| 国产成人高潮免费观看精品| 国产精品入口日韩视频大尺度| 国产精品自产拍在线观看| 亚洲色图美腿丝袜| 中文字幕欧美专区| 国产精品ⅴa在线观看h| 国产成人精品999| 久久男人的天堂| 日韩亚洲成人av在线| 欧美成人免费一级人片100| 日韩在线视频观看正片免费网站| 国产精品入口免费视| 一区二区在线视频播放| 美女久久久久久久| 浅井舞香一区二区|