亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb

首頁(yè) > 數(shù)據(jù)庫(kù) > SQL Server > 正文

Spark SQL數(shù)據(jù)加載和保存實(shí)例講解

2024-08-31 01:04:17
字體:
來(lái)源:轉(zhuǎn)載
供稿:網(wǎng)友

一、前置知識(shí)詳解
Spark SQL重要是操作DataFrame,DataFrame本身提供了save和load的操作,
Load:可以創(chuàng)建DataFrame,
Save:把DataFrame中的數(shù)據(jù)保存到文件或者說(shuō)與具體的格式來(lái)指明我們要讀取的文件的類型以及與具體的格式來(lái)指出我們要輸出的文件是什么類型。

二、Spark SQL讀寫數(shù)據(jù)代碼實(shí)戰(zhàn)

import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.Function;import org.apache.spark.sql.*;import org.apache.spark.sql.types.DataTypes;import org.apache.spark.sql.types.StructField;import org.apache.spark.sql.types.StructType;import java.util.ArrayList;import java.util.List;public class SparkSQLLoadSaveOps { public static void main(String[] args) {  SparkConf conf = new SparkConf().setMaster("local").setAppName("SparkSQLLoadSaveOps");  JavaSparkContext sc = new JavaSparkContext(conf);  SQLContext = new SQLContext(sc);  /**   * read()是DataFrameReader類型,load可以將數(shù)據(jù)讀取出來(lái)   */  DataFrame peopleDF = sqlContext.read().format("json").load("E://Spark//Sparkinstanll_package//Big_Data_Software//spark-1.6.0-bin-hadoop2.6//examples//src//main//resources//people.json");  /**   * 直接對(duì)DataFrame進(jìn)行操作   * Json: 是一種自解釋的格式,讀取Json的時(shí)候怎么判斷其是什么格式?   * 通過(guò)掃描整個(gè)Json。掃描之后才會(huì)知道元數(shù)據(jù)   */  //通過(guò)mode來(lái)指定輸出文件的是append。創(chuàng)建新文件來(lái)追加文件 peopleDF.select("name").write().mode(SaveMode.Append).save("E://personNames"); }}

讀取過(guò)程源碼分析如下:
1. read方法返回DataFrameReader,用于讀取數(shù)據(jù)。

/** * :: Experimental :: * Returns a [[DataFrameReader]] that can be used to read data in as a [[DataFrame]]. * {{{ *  sqlContext.read.parquet("/path/to/file.parquet") *  sqlContext.read.schema(schema).json("/path/to/file.json") * }}} * * @group genericdata * @since 1.4.0 */@Experimental//創(chuàng)建DataFrameReader實(shí)例,獲得了DataFrameReader引用def read: DataFrameReader = new DataFrameReader(this)

2.  然后再調(diào)用DataFrameReader類中的format,指出讀取文件的格式。

/** * Specifies the input data source format. * * @since 1.4.0 */def format(source: String): DataFrameReader = { this.source = source this}

3.  通過(guò)DtaFrameReader中l(wèi)oad方法通過(guò)路徑把傳入過(guò)來(lái)的輸入變成DataFrame。

/** * Loads input in as a [[DataFrame]], for data sources that require a path (e.g. data backed by * a local or distributed file system). * * @since 1.4.0 */// TODO: Remove this one in Spark 2.0.def load(path: String): DataFrame = { option("path", path).load()}

至此,數(shù)據(jù)的讀取工作就完成了,下面就對(duì)DataFrame進(jìn)行操作。
下面就是寫操作?。?!

1. 調(diào)用DataFrame中select函數(shù)進(jìn)行對(duì)列篩選

/** * Selects a set of columns. This is a variant of `select` that can only select * existing columns using column names (i.e. cannot construct expressions). * * {{{ *  // The following two are equivalent: *  df.select("colA", "colB") *  df.select($"colA", $"colB") * }}} * @group dfops * @since 1.3.0 */@scala.annotation.varargsdef select(col: String, cols: String*): DataFrame = select((col +: cols).map(Column(_)) : _*)

2.  然后通過(guò)write將結(jié)果寫入到外部存儲(chǔ)系統(tǒng)中。

/** * :: Experimental :: * Interface for saving the content of the [[DataFrame]] out into external storage. * * @group output * @since 1.4.0 */@Experimentaldef write: DataFrameWriter = new DataFrameWriter(this)

3.   在保持文件的時(shí)候mode指定追加文件的方式

/** * Specifies the behavior when data or table already exists. Options include:// Overwrite是覆蓋 *  - `SaveMode.Overwrite`: overwrite the existing data.//創(chuàng)建新的文件,然后追加 *  - `SaveMode.Append`: append the data. *  - `SaveMode.Ignore`: ignore the operation (i.e. no-op). *  - `SaveMode.ErrorIfExists`: default option, throw an exception at runtime. * * @since 1.4.0 */def mode(saveMode: SaveMode): DataFrameWriter = { this.mode = saveMode this}

4.   最后,save()方法觸發(fā)action,將文件輸出到指定文件中。

/** * Saves the content of the [[DataFrame]] at the specified path. * * @since 1.4.0 */def save(path: String): Unit = { this.extraOptions += ("path" -> path) save()}

三、Spark SQL讀寫整個(gè)流程圖如下

Spark,SQL數(shù)據(jù)加載和保存,SQL數(shù)據(jù)保存,SQL讀寫數(shù)據(jù)

四、對(duì)于流程中部分函數(shù)源碼詳解

DataFrameReader.Load()

1. Load()返回DataFrame類型的數(shù)據(jù)集合,使用的數(shù)據(jù)是從默認(rèn)的路徑讀取。

/** * Returns the dataset stored at path as a DataFrame, * using the default data source configured by spark.sql.sources.default. * * @group genericdata * @deprecated As of 1.4.0, replaced by `read().load(path)`. This will be removed in Spark 2.0. */@deprecated("Use read.load(path). This will be removed in Spark 2.0.", "1.4.0")def load(path: String): DataFrame = {//此時(shí)的read就是DataFrameReader read.load(path)}

2.  追蹤load源碼進(jìn)去,源碼如下:
在DataFrameReader中的方法。Load()通過(guò)路徑把輸入傳進(jìn)來(lái)變成一個(gè)DataFrame。

/**  * Loads input in as a [[DataFrame]], for data sources that require a path (e.g. data backed by * a local or distributed file system). * * @since 1.4.0 */// TODO: Remove this one in Spark 2.0.def load(path: String): DataFrame = { option("path", path).load()}

3.  追蹤load源碼如下:

/** * Loads input in as a [[DataFrame]], for data sources that don't require a path (e.g. external * key-value stores). * * @since 1.4.0 */def load(): DataFrame = {//對(duì)傳入的Source進(jìn)行解析 val resolved = ResolvedDataSource(  sqlContext,  userSpecifiedSchema = userSpecifiedSchema,  partitionColumns = Array.empty[String],  provider = source,  options = extraOptions.toMap) DataFrame(sqlContext, LogicalRelation(resolved.relation))}

DataFrameReader.format()

1. Format:具體指定文件格式,這就獲得一個(gè)巨大的啟示是:如果是Json文件格式可以保持為Parquet等此類操作。
Spark SQL在讀取文件的時(shí)候可以指定讀取文件的類型。例如,Json,Parquet.

/** * Specifies the input data source format.Built-in options include “parquet”,”json”,etc. * * @since 1.4.0 */def format(source: String): DataFrameReader = { this.source = source //FileType this}

DataFrame.write()

1. 創(chuàng)建DataFrameWriter實(shí)例

/** * :: Experimental :: * Interface for saving the content of the [[DataFrame]] out into external storage. * * @group output * @since 1.4.0 */@Experimentaldef write: DataFrameWriter = new DataFrameWriter(this)1

2.  追蹤DataFrameWriter源碼如下:
以DataFrame的方式向外部存儲(chǔ)系統(tǒng)中寫入數(shù)據(jù)。

/** * :: Experimental :: * Interface used to write a [[DataFrame]] to external storage systems (e.g. file systems, * key-value stores, etc). Use [[DataFrame.write]] to access this. * * @since 1.4.0 */@Experimentalfinal class DataFrameWriter private[sql](df: DataFrame) {

DataFrameWriter.mode()

1. Overwrite是覆蓋,之前寫的數(shù)據(jù)全都被覆蓋了。
Append:是追加,對(duì)于普通文件是在一個(gè)文件中進(jìn)行追加,但是對(duì)于parquet格式的文件則創(chuàng)建新的文件進(jìn)行追加。

/** * Specifies the behavior when data or table already exists. Options include: *  - `SaveMode.Overwrite`: overwrite the existing data. *  - `SaveMode.Append`: append the data. *  - `SaveMode.Ignore`: ignore the operation (i.e. no-op).//默認(rèn)操作 *  - `SaveMode.ErrorIfExists`: default option, throw an exception at runtime. * * @since 1.4.0 */def mode(saveMode: SaveMode): DataFrameWriter = { this.mode = saveMode this}

2.  通過(guò)模式匹配接收外部參數(shù)

/** * Specifies the behavior when data or table already exists. Options include: *  - `overwrite`: overwrite the existing data. *  - `append`: append the data. *  - `ignore`: ignore the operation (i.e. no-op). *  - `error`: default option, throw an exception at runtime. * * @since 1.4.0 */def mode(saveMode: String): DataFrameWriter = { this.mode = saveMode.toLowerCase match {  case "overwrite" => SaveMode.Overwrite  case "append" => SaveMode.Append  case "ignore" => SaveMode.Ignore  case "error" | "default" => SaveMode.ErrorIfExists  case _ => throw new IllegalArgumentException(s"Unknown save mode: $saveMode. " +   "Accepted modes are 'overwrite', 'append', 'ignore', 'error'.") } this}

DataFrameWriter.save()

1. save將結(jié)果保存?zhèn)魅氲穆窂健?/p>

/** * Saves the content of the [[DataFrame]] at the specified path. * * @since 1.4.0 */def save(path: String): Unit = { this.extraOptions += ("path" -> path) save()}

2.  追蹤save方法。

/** * Saves the content of the [[DataFrame]] as the specified table. * * @since 1.4.0 */def save(): Unit = { ResolvedDataSource(  df.sqlContext,  source,  partitioningColumns.map(_.toArray).getOrElse(Array.empty[String]),  mode,  extraOptions.toMap,  df)}

3.  其中source是SQLConf的defaultDataSourceName
private var source: String = df.sqlContext.conf.defaultDataSourceName
其中DEFAULT_DATA_SOURCE_NAME默認(rèn)參數(shù)是parquet。

// This is used to set the default data sourceval DEFAULT_DATA_SOURCE_NAME = stringConf("spark.sql.sources.default", defaultValue = Some("org.apache.spark.sql.parquet"), doc = "The default data source to use in input/output.")

DataFrame.scala中部分函數(shù)詳解:

1. toDF函數(shù)是將RDD轉(zhuǎn)換成DataFrame

/** * Returns the object itself. * @group basic * @since 1.3.0 */// This is declared with parentheses to prevent the Scala compiler from treating// `rdd.toDF("1")` as invoking this toDF and then apply on the returned DataFrame.def toDF(): DataFrame = this

2.  show()方法:將結(jié)果顯示出來(lái)

/** * Displays the [[DataFrame]] in a tabular form. For example: * {{{ *  year month AVG('Adj Close) MAX('Adj Close) *  1980 12  0.503218    0.595103 *  1981 01  0.523289    0.570307 *  1982 02  0.436504    0.475256 *  1983 03  0.410516    0.442194 *  1984 04  0.450090    0.483521 * }}} * @param numRows Number of rows to show * @param truncate Whether truncate long strings. If true, strings more than 20 characters will *       be truncated and all cells will be aligned right * * @group action * @since 1.5.0 */// scalastyle:off printlndef show(numRows: Int, truncate: Boolean): Unit = println(showString(numRows, truncate))// scalastyle:on println

追蹤showString源碼如下:showString中觸發(fā)action收集數(shù)據(jù)。

/** * Compose the string representing rows for output * @param _numRows Number of rows to show * @param truncate Whether truncate long strings and align cells right */private[sql] def showString(_numRows: Int, truncate: Boolean = true): String = { val numRows = _numRows.max(0) val sb = new StringBuilder val takeResult = take(numRows + 1) val hasMoreData = takeResult.length > numRows val data = takeResult.take(numRows) val numCols = schema.fieldNames.length

以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持VeVb武林網(wǎng)。


注:相關(guān)教程知識(shí)閱讀請(qǐng)移步到MSSQL教程頻道。
發(fā)表評(píng)論 共有條評(píng)論
用戶名: 密碼:
驗(yàn)證碼: 匿名發(fā)表
欧美男同性恋视频网站| 国产日韩亚洲欧美在线| 亚洲精品9999| 第一会所sis001亚洲| 国产精品一区二区91| xxxx视频在线观看| 国产91大片| 日韩视频精品| 亚洲国产欧美日韩在线观看第一区| 久久99国产精品视频| 中文字幕42页丝袜| av免费在线电影| 91久久精品国产性色| 青青视频在线观| 国产精品久久精品日日| 久久99影院| 91精品国产99久久久久久| 超碰在线视屏| 精品国偷自产在线视频| 欧美午夜理伦三级在线观看| 中国毛片直接看| 亚洲www色| 天海翼精品一区二区三区| 免费黄色一级网站| 欧美插天视频在线播放| 粗大黑人巨茎大战欧美成人| 亚洲欧美www| av中文天堂在线| 国产伦精品一区二区三区视频孕妇| 自拍av一区二区三区| 久草免费新视频| 国产成人精品一区二区三区四区| 丁香亚洲综合激情啪啪综合| 色呦呦在线观看视频| 午夜精品久久久久久久99老熟妇| 91成人在线观看喷潮| 老司机精品导航| 国产成人精品电影久久久| 91精品国产色综合久久不卡粉嫩| 亚洲国产精品女人| 另类调教123区| 69精品无码成人久久久久久| 亚洲欧美日韩国产成人精品影院| 伊人久久av导航| 久热国产精品视频| 在线免费观看的av网站| 国产aaa一级片| 777精品久无码人妻蜜桃| 啊v视频在线| 亚洲色欲色欲www| 国产乱淫a∨片免费视频| 涩涩屋成人免费视频软件| 天天噜夜夜操| 欧美日本视频在线| 丰满少妇高潮一区二区| 九一国产在线观看| 欧美情侣在线播放| 韩国三级电影一区二区| 一本色道久久综合狠狠躁的推荐| 亚洲欧美日韩爽爽影院| 成人欧美一区二区三区白人| 欧美体内she精视频| fc2人成共享视频在线观看| 久久综合导航| 国产专区欧美精品| 成人国产精品一区二区免费麻豆| 欧美日韩久久中文字幕| 国产农村妇女毛片精品久久莱园子| 思思99精品视频在线观看| av电影在线网| www.久久久久久久| 国产乱码精品一区二区三区四区| 国产精品大全| 蜜臀av性久久久久蜜臀aⅴ流畅| h视频免费观看| 日韩欧美一区二区久久婷婷| 奇米影视亚洲色图| 男人舔女人下部高潮全视频| 午夜影院观看视频免费| 日本精品一区二区在线观看| 精品无码一区二区三区的天堂| 无限资源日本好片| 国产成人香蕉在线视频网站| 国产精品亚洲第一| 国产在线视频自拍| av欧美精品.com| 在线观看一区二区视频| av片在线看| 999久久久国产精品| 国产精品啊v在线| 综合久久伊人| 天天插天天干天天操| 日韩av手机在线免费观看| 北条麻妃在线一区二区免费播放| 国产精品久一| 中文字幕在线2021| 亚洲欧美日本一区| 91丝袜脚交足在线播放| 美女少妇一区二区| 高清不卡一区| 激情久久五月天| 欧美色图亚洲图片| 宅男噜噜噜66国产精品免费| 肉色欧美久久久久久久免费看| 日韩中文字幕免费| 日本一区二区三区四区在线视频| 你懂的网站在线观看网址| 另类视频在线观看+1080p| 欧美连裤袜在线视频| 成人av在线电影| 天堂中文字幕在线| 极品销魂一区二区三区| 色综合天天综合给合国产| 久久久久久久999精品视频| 91久久精品国产91久久性色| 一级做a爰片久久毛片美女图片| 国产精品国产三级国产aⅴ| 小说区图片区图片区另类灬| 93在线视频精品免费观看| 精品视频免费观看| 日日夜夜精品视频天天综合网| 男人天堂av片| 国产1区2区在线观看| 亚洲色图欧美制服丝袜另类第一页| 国产精品男女视频| 在线视频观看日韩| 亚洲乱码日产精品bd在线观看| 黄色片网址在线观看| 成人免费毛片app| 久久精品亚洲精品| 视频一区二区精品的福利| www免费视频观看在线| 欧美精品momsxxx| 精品美女一区二区三区| 久久亚洲视频| 久久这里只有精品99| 国产欧美另类| 爱情岛论坛亚洲品质自拍视频网站| 国产精品美女毛片真酒店| 日本一区二区高清| 国内老司机av在线| 日本三级在线视频| 91欧美日韩一区| 一本色道久久综合亚洲精品婷婷| 经典三级一区二区三区视频| 国产免费久久久| 高潮一区二区| 一区二区三区日| 噜噜噜在线观看免费视频日韩| 首页国产欧美日韩丝袜| 亚洲国产精品成人| av日韩在线免费观看| 欧美另类视频在线| 老牛影视av老牛影视av| 日韩有码欧美| 91精品国产亚洲| 黄色高清无遮挡| 欧美18一12sex性处hd| 精品人妻av一区二区三区| 欧美一区二区三区艳史| 午夜精品久久久99热福利| 中文字幕在线观看日韩| 国产精品视频h| 精品一区二区三区亚洲| 午夜精品影院在线观看| 午夜伦理大片视频在线观看| 天天av综合| 日本丰满少妇裸体自慰| 国产69精品久久久久999小说| 精品在线你懂的| 免费看黄裸体一级大秀欧美| a4yy在线播放免费观看视频| 欧美aa国产视频| 91制片厂毛片| 欧美大胆人体bbbb| 6080yy精品一区二区三区| 狠狠色综合播放一区二区| 精品国产乱码久久久久久樱花| 色婷婷av777| 中文字幕乱码亚洲精品一区| 中文字幕日本在线| 久久一级黄色片| 国产又黄又猛又粗又爽的视频| 大片免费在线看视频| 在线观看av免费观看| 亚洲欧美天堂网| 在线观看二区| 菠萝蜜影院一区二区免费| 在线播放中文字幕一区| 国产老女人精品毛片久久| 色综合天天爱| 免费看成年人视频| 国产综合av一区二区三区| **亚洲第一综合导航网站| 天天性天天草| 国产精品无码白浆高潮| 亚洲v.com| 久久小说免费下载| av电影在线免费观看| 人禽交欧美网站免费| 成人性片免费| 天天在线女人的天堂视频| 亚洲精品成人a在线观看| 久久久久国产成人精品亚洲午夜| 亚洲图片欧美日产| 欧美白人最猛性xxxxx69交| 日日噜噜噜噜人人爽亚洲精品| 5566中文字幕一区二区电影| 久久午夜剧场| sihu影院永久在线影院| 国产日韩久久久| 秋霞久久久久久一区二区| 亚洲欧美日韩天堂| 亚洲av无码乱码国产精品fc2| 免费91视频| 国产成人精品国内自产拍免费看| 日韩亚洲欧美综合| 日韩欧美不卡视频| 午夜久久久久久久| 九色porny自拍视频在线播放| 国产亚洲欧美日韩在线一区| 国产精品一在线观看| 国产一区二区三区av电影| 手机看片日韩国产| 亚洲一二三在线观看| 国产精品日日夜夜| 91av久久久| 国产91丝袜在线观看| 色一情一欲一爱一乱| 欧美高清电影在线看| 黑人性受xxxx黑人xyx性爽| 欧美两根一起进3p做受视频| 日韩av在线播放中文字幕| 日本特黄一级片| 国产女同一区二区| xfplay爱情电影网love| 国产精品美女久久久久久不卡| 手机成人在线| 992tv国产精品成人影院| 欧美在线综合| 欧美在线观看18| 国产亚洲欧美日韩精品一区二区三区| 中文字幕在线国产| 在线视频2区| 三级网站视频在在线播放| 99精品欧美一区二区三区综合在线| 久久久久亚洲av成人网人人软件| 国产精品久久久久91| 欧美色就是色| 欧美三级不卡| 亚洲系列另类av| 91免费版视频| 中文无码久久精品| 毛片网站免费观看| 午夜爽爽爽男女免费观看影院| 欧美做爰性欧美大fennong| 精品国产人妻一区二区三区| 国产精品45p| 久久久国产精华液| a毛片在线看免费观看| 在线观看国产精品日韩av| 韩国在线视频一区| theporn国产精品| www日本视频| 全球中文成人在线| 精品一二三四在线| free性欧美16hd| 青青草手机在线观看| 国产一起色一起爱| 美女视频黄久久| 69成人免费视频| 亚洲在线播放电影| 麻豆av一区二区三区久久| 国产亚洲精品熟女国产成人| 在线观看日韩www视频免费| 91超碰在线观看| 国产精品视频白浆免费视频| 亚洲精品一区二区三区四区高清| 欧美国产成人精品| 北条麻妃一区二区三区| 中文字幕日韩精品一区| 成人h动漫精品一区二区器材| 亚洲激情欧美色图| 亚洲欧美一区二区三区久本道91| 久久久99精品免费观看不卡| 成人av第一页| 韩日精品视频一区| 国产69精品久久久久9| 日本美女视频一区二区| 久久精品最新免费国产成人| 欧美凹凸一区二区三区视频| 国产精品无码永久免费不卡| 欧美精品 - 色网| www..69.hd| 夜夜爽99久久国产综合精品女不卡| 日韩一区二区三区在线| 久久躁狠狠躁夜夜爽| 成人精品久久一区二区三区| 国产亚洲欧美在线视频| 国产大尺度视频| 中文字幕在线高清| 日韩天堂在线| 亚洲高清在线观看| av免费网站在线观看| 91精品久久久久久久久久久久久| 日韩午夜在线电影| 粉嫩虎白女毛片人体| 国产精品一在线观看| 九七影院97影院理论片久久| 亚洲一区电影在线观看| 欧美v在线观看| 亚洲精品乱码久久久久久久久久久久| 欧美日韩国产大片| 69xxx视频hd| 亚洲精华一区二区三区| av在线不卡免费观看| 久草视频这里只有精品| 青青操在线视频观看| 偷偷操不一样的久久| 日韩专区视频网站| 成人乱码一区二区三区| 日韩av电影在线免费播放| 久久久久久免费看| xxxx性bbbb欧美| 国产人成亚洲第一网站在线播放| 欧美一卡2卡3卡4卡无卡免费观看水多多| 亚洲老女人av| 999香蕉视频|