亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb

首頁 > 數據庫 > SQL Server > 正文

Spark SQL數據加載和保存實例講解

2024-08-31 01:04:17
字體:
來源:轉載
供稿:網友

一、前置知識詳解
Spark SQL重要是操作DataFrame,DataFrame本身提供了save和load的操作,
Load:可以創建DataFrame,
Save:把DataFrame中的數據保存到文件或者說與具體的格式來指明我們要讀取的文件的類型以及與具體的格式來指出我們要輸出的文件是什么類型。

二、Spark SQL讀寫數據代碼實戰

import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.Function;import org.apache.spark.sql.*;import org.apache.spark.sql.types.DataTypes;import org.apache.spark.sql.types.StructField;import org.apache.spark.sql.types.StructType;import java.util.ArrayList;import java.util.List;public class SparkSQLLoadSaveOps { public static void main(String[] args) {  SparkConf conf = new SparkConf().setMaster("local").setAppName("SparkSQLLoadSaveOps");  JavaSparkContext sc = new JavaSparkContext(conf);  SQLContext = new SQLContext(sc);  /**   * read()是DataFrameReader類型,load可以將數據讀取出來   */  DataFrame peopleDF = sqlContext.read().format("json").load("E://Spark//Sparkinstanll_package//Big_Data_Software//spark-1.6.0-bin-hadoop2.6//examples//src//main//resources//people.json");  /**   * 直接對DataFrame進行操作   * Json: 是一種自解釋的格式,讀取Json的時候怎么判斷其是什么格式?   * 通過掃描整個Json。掃描之后才會知道元數據   */  //通過mode來指定輸出文件的是append。創建新文件來追加文件 peopleDF.select("name").write().mode(SaveMode.Append).save("E://personNames"); }}

讀取過程源碼分析如下:
1. read方法返回DataFrameReader,用于讀取數據。

/** * :: Experimental :: * Returns a [[DataFrameReader]] that can be used to read data in as a [[DataFrame]]. * {{{ *  sqlContext.read.parquet("/path/to/file.parquet") *  sqlContext.read.schema(schema).json("/path/to/file.json") * }}} * * @group genericdata * @since 1.4.0 */@Experimental//創建DataFrameReader實例,獲得了DataFrameReader引用def read: DataFrameReader = new DataFrameReader(this)

2.  然后再調用DataFrameReader類中的format,指出讀取文件的格式。

/** * Specifies the input data source format. * * @since 1.4.0 */def format(source: String): DataFrameReader = { this.source = source this}

3.  通過DtaFrameReader中load方法通過路徑把傳入過來的輸入變成DataFrame。

/** * Loads input in as a [[DataFrame]], for data sources that require a path (e.g. data backed by * a local or distributed file system). * * @since 1.4.0 */// TODO: Remove this one in Spark 2.0.def load(path: String): DataFrame = { option("path", path).load()}

至此,數據的讀取工作就完成了,下面就對DataFrame進行操作。
下面就是寫操作?。?!

1. 調用DataFrame中select函數進行對列篩選

/** * Selects a set of columns. This is a variant of `select` that can only select * existing columns using column names (i.e. cannot construct expressions). * * {{{ *  // The following two are equivalent: *  df.select("colA", "colB") *  df.select($"colA", $"colB") * }}} * @group dfops * @since 1.3.0 */@scala.annotation.varargsdef select(col: String, cols: String*): DataFrame = select((col +: cols).map(Column(_)) : _*)

2.  然后通過write將結果寫入到外部存儲系統中。

/** * :: Experimental :: * Interface for saving the content of the [[DataFrame]] out into external storage. * * @group output * @since 1.4.0 */@Experimentaldef write: DataFrameWriter = new DataFrameWriter(this)

3.   在保持文件的時候mode指定追加文件的方式

/** * Specifies the behavior when data or table already exists. Options include:// Overwrite是覆蓋 *  - `SaveMode.Overwrite`: overwrite the existing data.//創建新的文件,然后追加 *  - `SaveMode.Append`: append the data. *  - `SaveMode.Ignore`: ignore the operation (i.e. no-op). *  - `SaveMode.ErrorIfExists`: default option, throw an exception at runtime. * * @since 1.4.0 */def mode(saveMode: SaveMode): DataFrameWriter = { this.mode = saveMode this}

4.   最后,save()方法觸發action,將文件輸出到指定文件中。

/** * Saves the content of the [[DataFrame]] at the specified path. * * @since 1.4.0 */def save(path: String): Unit = { this.extraOptions += ("path" -> path) save()}

三、Spark SQL讀寫整個流程圖如下

Spark,SQL數據加載和保存,SQL數據保存,SQL讀寫數據

四、對于流程中部分函數源碼詳解

DataFrameReader.Load()

1. Load()返回DataFrame類型的數據集合,使用的數據是從默認的路徑讀取。

/** * Returns the dataset stored at path as a DataFrame, * using the default data source configured by spark.sql.sources.default. * * @group genericdata * @deprecated As of 1.4.0, replaced by `read().load(path)`. This will be removed in Spark 2.0. */@deprecated("Use read.load(path). This will be removed in Spark 2.0.", "1.4.0")def load(path: String): DataFrame = {//此時的read就是DataFrameReader read.load(path)}

2.  追蹤load源碼進去,源碼如下:
在DataFrameReader中的方法。Load()通過路徑把輸入傳進來變成一個DataFrame。

/**  * Loads input in as a [[DataFrame]], for data sources that require a path (e.g. data backed by * a local or distributed file system). * * @since 1.4.0 */// TODO: Remove this one in Spark 2.0.def load(path: String): DataFrame = { option("path", path).load()}

3.  追蹤load源碼如下:

/** * Loads input in as a [[DataFrame]], for data sources that don't require a path (e.g. external * key-value stores). * * @since 1.4.0 */def load(): DataFrame = {//對傳入的Source進行解析 val resolved = ResolvedDataSource(  sqlContext,  userSpecifiedSchema = userSpecifiedSchema,  partitionColumns = Array.empty[String],  provider = source,  options = extraOptions.toMap) DataFrame(sqlContext, LogicalRelation(resolved.relation))}

DataFrameReader.format()

1. Format:具體指定文件格式,這就獲得一個巨大的啟示是:如果是Json文件格式可以保持為Parquet等此類操作。
Spark SQL在讀取文件的時候可以指定讀取文件的類型。例如,Json,Parquet.

/** * Specifies the input data source format.Built-in options include “parquet”,”json”,etc. * * @since 1.4.0 */def format(source: String): DataFrameReader = { this.source = source //FileType this}

DataFrame.write()

1. 創建DataFrameWriter實例

/** * :: Experimental :: * Interface for saving the content of the [[DataFrame]] out into external storage. * * @group output * @since 1.4.0 */@Experimentaldef write: DataFrameWriter = new DataFrameWriter(this)1

2.  追蹤DataFrameWriter源碼如下:
以DataFrame的方式向外部存儲系統中寫入數據。

/** * :: Experimental :: * Interface used to write a [[DataFrame]] to external storage systems (e.g. file systems, * key-value stores, etc). Use [[DataFrame.write]] to access this. * * @since 1.4.0 */@Experimentalfinal class DataFrameWriter private[sql](df: DataFrame) {

DataFrameWriter.mode()

1. Overwrite是覆蓋,之前寫的數據全都被覆蓋了。
Append:是追加,對于普通文件是在一個文件中進行追加,但是對于parquet格式的文件則創建新的文件進行追加。

/** * Specifies the behavior when data or table already exists. Options include: *  - `SaveMode.Overwrite`: overwrite the existing data. *  - `SaveMode.Append`: append the data. *  - `SaveMode.Ignore`: ignore the operation (i.e. no-op).//默認操作 *  - `SaveMode.ErrorIfExists`: default option, throw an exception at runtime. * * @since 1.4.0 */def mode(saveMode: SaveMode): DataFrameWriter = { this.mode = saveMode this}

2.  通過模式匹配接收外部參數

/** * Specifies the behavior when data or table already exists. Options include: *  - `overwrite`: overwrite the existing data. *  - `append`: append the data. *  - `ignore`: ignore the operation (i.e. no-op). *  - `error`: default option, throw an exception at runtime. * * @since 1.4.0 */def mode(saveMode: String): DataFrameWriter = { this.mode = saveMode.toLowerCase match {  case "overwrite" => SaveMode.Overwrite  case "append" => SaveMode.Append  case "ignore" => SaveMode.Ignore  case "error" | "default" => SaveMode.ErrorIfExists  case _ => throw new IllegalArgumentException(s"Unknown save mode: $saveMode. " +   "Accepted modes are 'overwrite', 'append', 'ignore', 'error'.") } this}

DataFrameWriter.save()

1. save將結果保存傳入的路徑。

/** * Saves the content of the [[DataFrame]] at the specified path. * * @since 1.4.0 */def save(path: String): Unit = { this.extraOptions += ("path" -> path) save()}

2.  追蹤save方法。

/** * Saves the content of the [[DataFrame]] as the specified table. * * @since 1.4.0 */def save(): Unit = { ResolvedDataSource(  df.sqlContext,  source,  partitioningColumns.map(_.toArray).getOrElse(Array.empty[String]),  mode,  extraOptions.toMap,  df)}

3.  其中source是SQLConf的defaultDataSourceName
private var source: String = df.sqlContext.conf.defaultDataSourceName
其中DEFAULT_DATA_SOURCE_NAME默認參數是parquet。

// This is used to set the default data sourceval DEFAULT_DATA_SOURCE_NAME = stringConf("spark.sql.sources.default", defaultValue = Some("org.apache.spark.sql.parquet"), doc = "The default data source to use in input/output.")

DataFrame.scala中部分函數詳解:

1. toDF函數是將RDD轉換成DataFrame

/** * Returns the object itself. * @group basic * @since 1.3.0 */// This is declared with parentheses to prevent the Scala compiler from treating// `rdd.toDF("1")` as invoking this toDF and then apply on the returned DataFrame.def toDF(): DataFrame = this

2.  show()方法:將結果顯示出來

/** * Displays the [[DataFrame]] in a tabular form. For example: * {{{ *  year month AVG('Adj Close) MAX('Adj Close) *  1980 12  0.503218    0.595103 *  1981 01  0.523289    0.570307 *  1982 02  0.436504    0.475256 *  1983 03  0.410516    0.442194 *  1984 04  0.450090    0.483521 * }}} * @param numRows Number of rows to show * @param truncate Whether truncate long strings. If true, strings more than 20 characters will *       be truncated and all cells will be aligned right * * @group action * @since 1.5.0 */// scalastyle:off printlndef show(numRows: Int, truncate: Boolean): Unit = println(showString(numRows, truncate))// scalastyle:on println

追蹤showString源碼如下:showString中觸發action收集數據。

/** * Compose the string representing rows for output * @param _numRows Number of rows to show * @param truncate Whether truncate long strings and align cells right */private[sql] def showString(_numRows: Int, truncate: Boolean = true): String = { val numRows = _numRows.max(0) val sb = new StringBuilder val takeResult = take(numRows + 1) val hasMoreData = takeResult.length > numRows val data = takeResult.take(numRows) val numCols = schema.fieldNames.length

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持VeVb武林網。


注:相關教程知識閱讀請移步到MSSQL教程頻道。
發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb
亚洲精品中文字幕女同| 色777狠狠综合秋免鲁丝| 原创国产精品91| 欧美激情亚洲另类| 国产日韩视频在线观看| 韩剧1988在线观看免费完整版| 国产精品劲爆视频| 亚洲成avwww人| 国产精品99久久久久久久久久久久| 韩国三级日本三级少妇99| 国产精品无av码在线观看| 人体精品一二三区| 色悠久久久久综合先锋影音下载| 国产丝袜一区二区三区免费视频| 成人中文字幕在线观看| 日韩欧美综合在线视频| 欧美剧在线观看| 欧美黑人极品猛少妇色xxxxx| 色999日韩欧美国产| 在线成人激情黄色| 欧美激情aaaa| 日韩激情av在线播放| 欧美亚洲成人xxx| 欧美视频专区一二在线观看| 国产精品欧美激情| 欧美做爰性生交视频| 欧美与欧洲交xxxx免费观看| 亚洲成人激情小说| 国产成+人+综合+亚洲欧美丁香花| 久久久久久亚洲精品| 精品女厕一区二区三区| 国产精品福利久久久| 国产伦精品一区二区三区精品视频| 成人a视频在线观看| 91色视频在线导航| 91成人福利在线| 国产日韩中文在线| 久久久久www| 日本精品va在线观看| 国外成人在线播放| 97在线看免费观看视频在线观看| 成人激情在线观看| 久久久亚洲国产天美传媒修理工| 日韩在线免费观看视频| 日韩在线视频免费观看| 在线亚洲国产精品网| 欧美成人第一页| 欧美极品少妇与黑人| 日韩一区二区av| 亚洲最大成人网色| 欧美第一黄色网| 久久精品免费播放| 欧美自拍视频在线| 国产成人在线一区二区| 成人精品在线观看| 一道本无吗dⅴd在线播放一区| 麻豆国产va免费精品高清在线| 78m国产成人精品视频| 欧美日韩国产在线播放| 日韩精品中文字幕在线播放| 国产精品第一区| 亚洲欧美日本精品| 久久福利视频网| 日韩黄色高清视频| 日韩高清电影免费观看完整| 国产精品视频久久久久| 91av视频在线| www.亚洲人.com| 亚洲综合日韩在线| 亚洲变态欧美另类捆绑| 国产成人免费av| 国产亚洲精品高潮| 欧美电影免费在线观看| 欧美大奶子在线| www.亚洲人.com| 欧美性高潮床叫视频| 日韩视频免费看| 日本久久91av| 亚洲第一区在线| 亚洲自拍偷拍色图| 日韩暖暖在线视频| 久久精品国产清自在天天线| 欧美视频一区二区三区…| 91精品成人久久| 日本精品久久久久久久| 成人免费观看49www在线观看| 欧美性xxxx极品hd满灌| 热久久美女精品天天吊色| 亚洲午夜未满十八勿入免费观看全集| 在线视频欧美日韩精品| 亚洲天堂男人的天堂| 性欧美暴力猛交69hd| 最新中文字幕亚洲| 日韩中文在线中文网在线观看| 日韩专区在线播放| 欧美专区第一页| 91精品啪在线观看麻豆免费| 在线精品视频视频中文字幕| 精品高清一区二区三区| 亚洲在线一区二区| 中文字幕久久精品| 国产精品一区二区久久国产| 91在线网站视频| 亚洲国产成人久久| 在线观看欧美视频| 欧美性猛交xxxx乱大交极品| 欧美午夜视频在线观看| 视频在线观看一区二区| 91免费国产视频| 国产精品久久久久久超碰| 日韩免费在线电影| 欧美在线影院在线视频| 欧美性xxxxxx| 青青草原一区二区| 色综合久久88色综合天天看泰| 精品小视频在线| 国产在线观看一区二区三区| 日韩一区二区三区xxxx| 日韩风俗一区 二区| 亚洲欧美日韩另类| 日韩精品久久久久| 亚洲国产精品人人爽夜夜爽| 亚洲最大福利视频网站| 久久久亚洲国产| 一区二区在线免费视频| 欧美性猛交xxxx黑人| 日韩中文字幕第一页| 欧美怡春院一区二区三区| 狠狠色香婷婷久久亚洲精品| 国产精品久久久久久久美男| 亚洲色图av在线| 国产精品免费久久久| 亚洲丁香婷深爱综合| 这里只有精品视频在线| 91影院在线免费观看视频| xxx一区二区| 欧美日韩国产丝袜另类| 久久国产一区二区三区| 国产91热爆ts人妖在线| 国产日韩在线一区| 欧美激情视频三区| 久久久久成人网| 美女扒开尿口让男人操亚洲视频网站| 红桃av永久久久| 97avcom| 欧美在线视频免费| 国产精品九九九| 中文字幕欧美精品日韩中文字幕| 国产91亚洲精品| 欧美理论电影在线观看| 欧美高清不卡在线| 欧美精品在线第一页| 国产精品欧美日韩久久| 亚洲成成品网站| 一本色道久久88亚洲综合88| 国产精品人成电影在线观看| 97久久超碰福利国产精品…| 九九热这里只有在线精品视| 精品国产一区二区三区久久久狼| 亚洲欧美中文日韩在线| 色噜噜亚洲精品中文字幕| 国产日韩欧美另类| 亚洲精品v欧美精品v日韩精品| 欧美激情精品久久久久久久变态|