亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb

首頁 > 開發 > Java > 正文

Java和scala實現 Spark RDD轉換成DataFrame的兩種方法小結

2024-07-14 08:41:10
字體:
來源:轉載
供稿:網友

一:準備數據源

在項目下新建一個student.txt文件,里面的內容為:

1,zhangsan,20 2,lisi,21 3,wanger,19 4,fangliu,18 

二:實現

Java版:

1.首先新建一個student的Bean對象,實現序列化和toString()方法,具體代碼如下:

package com.cxd.sql;import java.io.Serializable;@SuppressWarnings("serial")public class Student implements Serializable { String sid; String sname; int sage; public String getSid() {  return sid; } public void setSid(String sid) {  this.sid = sid; } public String getSname() {  return sname; } public void setSname(String sname) {  this.sname = sname; } public int getSage() {  return sage; } public void setSage(int sage) {  this.sage = sage; } @Override public String toString() {  return "Student [sid=" + sid + ", sname=" + sname + ", sage=" + sage + "]"; } }		

2.轉換,具體代碼如下

package com.cxd.sql;import java.util.ArrayList;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.sql.Dataset;import org.apache.spark.sql.Row;import org.apache.spark.sql.RowFactory;import org.apache.spark.sql.SaveMode;import org.apache.spark.sql.SparkSession;import org.apache.spark.sql.types.DataTypes;import org.apache.spark.sql.types.StructField;import org.apache.spark.sql.types.StructType;public class TxtToParquetDemo { public static void main(String[] args) {    SparkConf conf = new SparkConf().setAppName("TxtToParquet").setMaster("local");  SparkSession spark = SparkSession.builder().config(conf).getOrCreate();  reflectTransform(spark);//Java反射  dynamicTransform(spark);//動態轉換 }  /**  * 通過Java反射轉換  * @param spark  */ private static void reflectTransform(SparkSession spark) {  JavaRDD<String> source = spark.read().textFile("stuInfo.txt").javaRDD();    JavaRDD<Student> rowRDD = source.map(line -> {   String parts[] = line.split(",");   Student stu = new Student();   stu.setSid(parts[0]);   stu.setSname(parts[1]);   stu.setSage(Integer.valueOf(parts[2]));   return stu;  });    Dataset<Row> df = spark.createDataFrame(rowRDD, Student.class);  df.select("sid", "sname", "sage").  coalesce(1).write().mode(SaveMode.Append).parquet("parquet.res"); } /**  * 動態轉換  * @param spark  */ private static void dynamicTransform(SparkSession spark) {  JavaRDD<String> source = spark.read().textFile("stuInfo.txt").javaRDD();    JavaRDD<Row> rowRDD = source.map( line -> {   String[] parts = line.split(",");   String sid = parts[0];   String sname = parts[1];   int sage = Integer.parseInt(parts[2]);      return RowFactory.create(     sid,     sname,     sage     );  });    ArrayList<StructField> fields = new ArrayList<StructField>();  StructField field = null;  field = DataTypes.createStructField("sid", DataTypes.StringType, true);  fields.add(field);  field = DataTypes.createStructField("sname", DataTypes.StringType, true);  fields.add(field);  field = DataTypes.createStructField("sage", DataTypes.IntegerType, true);  fields.add(field);    StructType schema = DataTypes.createStructType(fields);    Dataset<Row> df = spark.createDataFrame(rowRDD, schema);  df.coalesce(1).write().mode(SaveMode.Append).parquet("parquet.res1");     } }

scala版本:

import org.apache.spark.sql.SparkSessionimport org.apache.spark.sql.types.StringTypeimport org.apache.spark.sql.types.StructFieldimport org.apache.spark.sql.types.StructTypeimport org.apache.spark.sql.Rowimport org.apache.spark.sql.types.IntegerTypeobject RDD2Dataset {  case class Student(id:Int,name:String,age:Int) def main(args:Array[String]) {  val spark=SparkSession.builder().master("local").appName("RDD2Dataset").getOrCreate() import spark.implicits._ reflectCreate(spark) dynamicCreate(spark) }  /**	 * 通過Java反射轉換	 * @param spark	 */ private def reflectCreate(spark:SparkSession):Unit={ import spark.implicits._ val stuRDD=spark.sparkContext.textFile("student2.txt") //toDF()為隱式轉換 val stuDf=stuRDD.map(_.split(",")).map(parts⇒Student(parts(0).trim.toInt,parts(1),parts(2).trim.toInt)).toDF() //stuDf.select("id","name","age").write.text("result") //對寫入文件指定列名 stuDf.printSchema() stuDf.createOrReplaceTempView("student") val nameDf=spark.sql("select name from student where age<20") //nameDf.write.text("result") //將查詢結果寫入一個文件 nameDf.show() }  /**	 * 動態轉換	 * @param spark	 */ private def dynamicCreate(spark:SparkSession):Unit={ val stuRDD=spark.sparkContext.textFile("student.txt") import spark.implicits._ val schemaString="id,name,age" val fields=schemaString.split(",").map(fieldName => StructField(fieldName, StringType, nullable = true)) val schema=StructType(fields) val rowRDD=stuRDD.map(_.split(",")).map(parts⇒Row(parts(0),parts(1),parts(2))) val stuDf=spark.createDataFrame(rowRDD, schema)  stuDf.printSchema() val tmpView=stuDf.createOrReplaceTempView("student") val nameDf=spark.sql("select name from student where age<20") //nameDf.write.text("result") //將查詢結果寫入一個文件 nameDf.show() }}

注:

1.上面代碼全都已經測試通過,測試的環境為spark2.1.0,jdk1.8。

2.此代碼不適用于spark2.0以前的版本。

以上這篇Java和scala實現 Spark RDD轉換成DataFrame的兩種方法小結就是小編分享給大家的全部內容了,希望能給大家一個參考,也希望大家多多支持VeVb武林網。


注:相關教程知識閱讀請移步到JAVA教程頻道。
發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb
国产精品久久久一区| 亚洲国产精品va在看黑人| 国产精品丝袜一区二区三区| 日韩成人激情在线| 在线观看国产欧美| 日韩精品高清视频| 日韩免费av一区二区| 欧美激情国产精品| 久久久国产精品视频| 国产精品亚洲美女av网站| 国产精品永久免费视频| 国产日韩精品在线| 亚洲区一区二区| 欧美激情视频三区| 日韩av在线电影网| 久久久久999| 国产91精品网站| 国产精品嫩草影院久久久| 97欧美精品一区二区三区| 成人黄色在线免费| 亚洲成人激情视频| 日韩电影在线观看中文字幕| 精品女厕一区二区三区| 国产精品视频导航| 91久久国产综合久久91精品网站| 2019中文在线观看| 久久影院在线观看| 色婷婷成人综合| 68精品国产免费久久久久久婷婷| 高清欧美性猛交| 中文欧美在线视频| 国产91免费看片| 久久露脸国产精品| 国产不卡av在线| 久久久亚洲欧洲日产国码aⅴ| 精品国产电影一区| 青青草成人在线| 自拍亚洲一区欧美另类| 国语自产精品视频在线看| 久久综合久中文字幕青草| 亚洲男人av电影| 黑人与娇小精品av专区| 欧美亚洲另类制服自拍| 91久久精品久久国产性色也91| 超碰精品一区二区三区乱码| 国产精品高清在线观看| 国产精品高潮呻吟久久av野狼| 日韩精品免费在线视频| 欧美日韩国产成人在线观看| 中国日韩欧美久久久久久久久| 欧美精品videosex牲欧美| 亚洲一级免费视频| 日韩精品在线电影| 欧美性极品少妇精品网站| 欧美午夜激情在线| 亚洲电影在线观看| 国产精品免费一区二区三区都可以| 成人免费观看49www在线观看| 国产日韩亚洲欧美| 亚洲欧美资源在线| 国产精品美女久久久久久免费| 国产成人精品国内自产拍免费看| 久久久久亚洲精品| 久久久久久久久中文字幕| 亚洲欧美日韩久久久久久| 久久国内精品一国内精品| 亚洲欧洲美洲在线综合| 国产精品国产亚洲伊人久久| 久久精品影视伊人网| 久久韩剧网电视剧| 中文字幕不卡在线视频极品| 国产精品久久精品| 久久99国产综合精品女同| 国产经典一区二区| 亚洲国产婷婷香蕉久久久久久| 夜夜躁日日躁狠狠久久88av| 色偷偷88888欧美精品久久久| 在线观看欧美视频| 97香蕉超级碰碰久久免费软件| 国产精品中文在线| 欧美色道久久88综合亚洲精品| 久久久久久亚洲精品不卡| 91久久嫩草影院一区二区| 欧美高清理论片| 国产综合色香蕉精品| 在线成人免费网站| 亚洲精品一区中文字幕乱码| 国产成人亚洲综合91精品| 亚洲美女久久久| 92福利视频午夜1000合集在线观看| 欧美日韩美女视频| 日韩在线观看免费全集电视剧网站| 欧美一级成年大片在线观看| www.久久色.com| 国产欧美日韩免费看aⅴ视频| 亚洲自拍另类欧美丝袜| 精品一区二区电影| 国产视频福利一区| 欧美性xxxxxxx| 国产精品欧美激情在线播放| 国产情人节一区| 日韩高清电影免费观看完整| 亚洲一区中文字幕| 国产精品va在线播放我和闺蜜| 伊人久久免费视频| 欧美精品福利视频| 国产69精品久久久久99| 国产精品一区二区性色av| 91精品综合视频| 久久久亚洲国产| 欧美在线免费观看| 亚洲热线99精品视频| 国产精品永久免费观看| 按摩亚洲人久久| 久久亚洲精品中文字幕冲田杏梨| 久久久久久噜噜噜久久久精品| 欧洲中文字幕国产精品| 中文字幕日韩av电影| 久久综合九色九九| 激情懂色av一区av二区av| 国产又爽又黄的激情精品视频| 操91在线视频| 亚洲精品suv精品一区二区| 国产一区二区黑人欧美xxxx| 91精品国产高清久久久久久久久| 国产精品亚洲综合天堂夜夜| 亚洲国产精品电影在线观看| 成人在线免费观看视视频| 日韩成人av网| 中文字幕亚洲二区| 国模视频一区二区三区| 91社区国产高清| 亚洲激情中文字幕| 欧美成人免费观看| 中文字幕成人在线| 国产精品免费一区二区三区都可以| 精品中文字幕在线观看| 91精品视频在线免费观看| 国产精品久久久久久久久借妻| 91大神在线播放精品| 538国产精品一区二区免费视频| 亚洲天堂av在线免费观看| 国产亚洲欧洲黄色| 久久视频免费观看| 久久久精品欧美| 综合网日日天干夜夜久久| 久久视频在线直播| 97国产在线视频| 欧美最猛性xxxxx免费| 久久精品99久久久香蕉| 精品色蜜蜜精品视频在线观看| 欧美性xxxx极品hd满灌| 日韩国产一区三区| 欧美大秀在线观看| 日韩视频免费在线观看| 欧美性猛交xxxx乱大交| 欧美激情第1页| 久久久亚洲精品视频| 久久成人这里只有精品| 一区二区三区回区在观看免费视频| 国产精品香蕉av| 欧美一区二区视频97| 亚洲xxxx做受欧美| 国产精品成人av在线|