Hadoop streaming
Hadoop為MapReduce提供了不同的API,可以方便我們使用不同的編程語言來使用MapReduce框架,而不是只局限于Java。這里要介紹的就是Hadoop streaming API。Hadoop streaming 使用Unix的standard streams作為我們mapreduce程序和MapReduce框架之間的接口。所以你可以用任何語言來編寫MapReduce程序,只要該語言可以往standard input/output上進行讀寫。
streamming是天然適用于文字處理的(text processing),當然,也僅適用純文本的處理,對于需要對象和序列化的場景,hadoop streaming無能為力。它力圖使我們能夠快捷的通過各種腳本語言,快速的處理大量的文本文件。以下是steaming的一些特點:
常用的Streaming編程語言:
Ruby
下面是一個Ruby編寫的MapReduce程序的示例:
map
max_temperature_map.rb:
ruby #!/usr/bin/env ruby STDIN.each_line do |line| val = line year, temp, q = val[15,4], val[87,5], val[92,1] puts "#{year}/t#{temp}" if (temp != "+9999" && q =~ /[01459]/) end
reduce
max_temperature_reduce.rb:
ruby #!/usr/bin/env ruby last_key, max_val = nil, -1000000 STDIN.each_line do |line| key, val = line.split("/t") if last_key && last_key != key puts "#{last_key}/t#{max_val}" last_key, max_val = key, val.to_i else last_key, max_val = key, [max_val, val.to_i].max end end puts "#{last_key}/t#{max_val}" if last_key
運行
% hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar /-input input/ncdc/sample.txt /-output output /-mapper ch02/src/main/ruby/max_temperature_map.rb /-reducer ch02/src/main/ruby/max_temperature_reduce.rb
Python
Map
#!/usr/bin/env pythonimport reimport sysfor line in sys.stdin:val = line.strip()(year, temp, q) = (val[15:19], val[87:92], val[92:93])if (temp != "+9999" and re.match("[01459]", q)):print "%s/t%s" % (year, temp)
Reduce
#!/usr/bin/env pythonimport sys(last_key, max_val) = (None, -sys.maxint)for line in sys.stdin:(key, val) = line.strip().split("/t")if last_key and last_key != key:print "%s/t%s" % (last_key, max_val)(last_key, max_val) = (key, int(val))else:(last_key, max_val) = (key, max(max_val, int(val)))if last_key:print "%s/t%s" % (last_key, max_val)
運行
% hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar /-input input/ncdc/sample.txt /-output output /-mapper ch02/src/main/ruby/max_temperature_map.py/-reducer ch02/src/main/ruby/max_temperature_reduce.py
Bash shell
Map
#!/usr/bin/env bash# NLineInputFormat gives a single line: key is offset, value is S3 URIread offset s3file# Retrieve file from S3 to local diskecho "reporter:status:Retrieving $s3file" >&2$HADOOP_INSTALL/bin/hadoop fs -get $s3file .# Un-bzip and un-tar the local filetarget=`basename $s3file .tar.bz2`mkdir -p $targetecho "reporter:status:Un-tarring $s3file to $target" >&2tar jxf `basename $s3file` -C $target# Un-gzip each station file and concat into one fileecho "reporter:status:Un-gzipping $target" >&2for file in $target/*/*dogunzip -c $file >> $target.allecho "reporter:status:Processed $file" >&2done# Put gzipped version into HDFSecho "reporter:status:Gzipping $target and putting in HDFS" >&2gzip -c $target.all | $HADOOP_INSTALL/bin/hadoop fs -put - gz/$target.gz
運行
% hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar /-D mapred.reduce.tasks=0 /-D mapred.map.tasks.speculative.execution=false /-D mapred.task.timeout=12000000 /-input ncdc_files.txt /-inputformat org.apache.hadoop.mapred.lib.NLineInputFormat /-output output /-mapper load_ncdc_map.sh /-file load_ncdc_map.sh
Combiner
在streaming模式下,仍然可以運行Combiner,兩種方法:
這里具體解釋第二種方法:
% hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar /-input input/ncdc/all /-output output /-mapper "ch02/src/main/ruby/max_temperature_map.rb | sort |ch02/src/main/ruby/max_temperature_reduce.rb" /-reducer ch02/src/main/ruby/max_temperature_reduce.rb /-file ch02/src/main/ruby/max_temperature_map.rb /-file ch02/src/main/ruby/max_temperature_reduce.rb
注意看-mapper這一行,通關管道的方式,把mapper的臨時輸出文件(intermediate file,Map完成后的臨時文件)作為輸入,送到sort進行排序,然后送到reduce腳本,來完成類似于combiner的工作。這時候的輸出才真正的作為shuffle的輸入,被分組并在網絡上發送到Reduce
感謝閱讀,希望能幫助到大家,謝謝大家對本站的支持!
新聞熱點
疑難解答
圖片精選