18688888888,20160327082400,16030401EAFB68F1E3CDF819735E1C66,1 18611132889,20160327082500,16030401EAFB68F1E3CDF819735E1C66,1 18688888888,20160327170000,16030401EAFB68F1E3CDF819735E1C66,0 18611132889,20160327180000,16030401EAFB68F1E3CDF819735E1C66,0基站B:18611132889,20160327075000,9F36407EAD0629FC166F14DDE7970F68,1 18688888888,20160327075100,9F36407EAD0629FC166F14DDE7970F68,1 18611132889,20160327081000,9F36407EAD0629FC166F14DDE7970F68,0 18688888888,20160327081300,9F36407EAD0629FC166F14DDE7970F68,0 18688888888,20160327175000,9F36407EAD0629FC166F14DDE7970F68,1 18611132889,20160327182000,9F36407EAD0629FC166F14DDE7970F68,1 18688888888,20160327220000,9F36407EAD0629FC166F14DDE7970F68,0 18611132889,20160327230000,9F36407EAD0629FC166F14DDE7970F68,0基站C:18611132889,20160327081100,CC0710CC94ECC657A8561DE549D940E0,1 18688888888,20160327081200,CC0710CC94ECC657A8561DE549D940E0,1 18688888888,20160327081900,CC0710CC94ECC657A8561DE549D940E0,0 18611132889,20160327082000,CC0710CC94ECC657A8561DE549D940E0,0 18688888888,20160327171000,CC0710CC94ECC657A8561DE549D940E0,1 18688888888,20160327171600,CC0710CC94ECC657A8561DE549D940E0,0 18611132889,20160327180500,CC0710CC94ECC657A8561DE549D940E0,1 18611132889,20160327181500,CC0710CC94ECC657A8561DE549D940E0,0 下面是基站表的數據,共4個字段,分別代表基站id和經緯度以及信號的輻射類型(比如2G信號、3G信號和4G信號):9F36407EAD0629FC166F14DDE7970F68,116.304864,40.050645,6 CC0710CC94ECC657A8561DE549D940E0,116.303955,40.041935,6 16030401EAFB68F1E3CDF819735E1C66,116.296302,40.032296,6 基于以上3個基站的日志數據,要求計算某個手機號碼在一天之內出現最多的兩個地點。因為一個手機號碼可能一天當中可能會經過很多的基站,可能他在家停留了10個小時,在公司停留了8個小時,還有可能坐車的時候路過了一些基站。思路: 求每個手機號碼在哪些基站下面停留的時間最長,在計算的時候,用"手機號碼+基站"才能定位在哪個基站下面停留的時間, 因為每個基站下面會有很多的用戶的日志數據。全國有很多的基站,每個電信分公司只負責計算自己的數據。數據存放在基站下面的機房的服務器上。一般是用過一些工具通過網絡把這些數據搜集過來。搜集過來的數據量可能會很大,這些數據一般會存放到分布式的文件系統中,比如存放到HDFS中。我們可能會基于一周或者一個月的數據量來計算,時間跨度越大,計算出來的結構就越精確。相關資料在"Spark資料"中。重要:寫好的spark程序,如果我不想每次都提交到spark集群上面運行,可以在程序中指定"在本地運行模式",也就是如下方式:new SparkConf().setAppName("xxxx").setMaster("local")它表示要在本地模擬一個程序來運行,它并沒有把它提交到集群。但是,這種方式在linux和Mac系統中沒有問題,而在Windows下會有異常。因為我們的spark程序要從hdfs中讀數據,所以它要用到hadoop的InputFormat來讀數據,如果要在windows下面進行本地調試,需要做一些事情。我們知道hadoop要壓縮和解壓縮,那么壓縮和解壓縮所需要的都是一些c或c++編寫的庫,而這些c或c++編寫的庫文件是不跨平臺的。所以要在windows下面調試就必須先把這些庫安裝好。我們建議在linux下面進行調試,如果你沒有Mac系統的話,可以在linux虛擬機上安裝一個idea開發工具?使用Linux的圖形界面來調試。下面是完整的代碼:import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}object MobileLocation { def main(args: Array[String]) { val conf = new SparkConf().setAppName("MobileLocation").setMaster("local[2]") val sc = new SparkContext(conf) val lines: RDD[String] = sc.textFile(args(0)) //切分 //lines.map(_.split(",")).map(arr => (arr(0), arr(1).toLong, arr(2), args(3))) val splited = lines.map(line => { val fields = line.split(",") val mobile = fields(0) val lac = fields(2) val tp = fields(3) val time = if(tp == "1") -fields(1).toLong else fields(1).toLong //拼接數據 ((mobile, lac), time) }) //分組聚合 val reduced : RDD[((String, String), Long)] = splited.reduceByKey(_+_) val lmt = reduced.map(x => { //(基站,(手機號, 時間)) (x._1._2, (x._1._1, x._2)) }) //連接 val lacInfo: RDD[String] = sc.textFile(args(1)) //整理基站數據 val splitedLacInfo = lacInfo.map(line => { val fields = line.split(",") val id = fields(0) val x = fields(1) val y = fields(2) (id, (x, y)) }) //連接jion val joined: RDD[(String, ((String, Long), (String, String)))] = lmt.join(splitedLacInfo) PRintln(joined.collect().toBuffer) sc.stop() }}
新聞熱點
疑難解答