博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
spark SQL学习(load和save操作)
阅读量:7066 次
发布时间:2019-06-28

本文共 4635 字,大约阅读时间需要 15 分钟。

load操作:主要用于加载数据,创建出DataFrame

save操作:主要用于将DataFrame中的数据保存到文件中

代码示例(默认为parquet数据源类型)

package wujiadong_sparkSQLimport org.apache.spark.sql.SQLContextimport org.apache.spark.{SparkConf, SparkContext}/**  * Created by Administrator on 2017/2/3.  */object GenericLoadSave {  def main(args: Array[String]): Unit = {    val conf = new SparkConf().setAppName("GenericLoadSave")    val sc = new SparkContext(conf)    val sqlContext = new SQLContext(sc)//load默认是加载parquet格式文件    val usersDF = sqlContext.read.load("hdfs://master:9000/student/2016113012/spark/users.parquet")    usersDF.write.save("hdfs://master:9000/student/2016113012/parquet_out1")  }}

提交集群运行

hadoop@master:~/wujiadong$ spark-submit --class wujiadong_sparkSQL.GenericLoadSave  --executor-memory 500m --total-executor-cores 2 /home/hadoop/wujiadong/wujiadong.spark.jar

运行后查看是否保存成功

hadoop@slave01:~$ hadoop fs -ls /student/2016113012/parquet_out117/02/03 12:06:26 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicableFound 4 items-rw-r--r--   3 hadoop supergroup          0 2017-02-03 12:05 /student/2016113012/parquet_out1/_SUCCESS-rw-r--r--   3 hadoop supergroup        476 2017-02-03 12:05 /student/2016113012/parquet_out1/_common_metadata-rw-r--r--   3 hadoop supergroup        841 2017-02-03 12:05 /student/2016113012/parquet_out1/_metadata-rw-r--r--   3 hadoop supergroup        864 2017-02-03 12:05 /student/2016113012/parquet_out1/part-r-00000-8025e2a8-ab06-4558-9d76-bb2cad0042cf.gz.parquet

手动指定数据源类型(进行格式转换很方便)

默认情况下不指定数据源类型的话就是parquet类型

代码示例(手动指定数据源类型)

package wujiadong_sparkSQLimport org.apache.spark.sql.SQLContextimport org.apache.spark.{SparkConf, SparkContext}/**  * Created by Administrator on 2017/2/3.  */object ManuallySpecifyOptions {  def main(args: Array[String]): Unit = {    val conf = new SparkConf().setAppName("ManuallySpecifyOptions")    val sc = new SparkContext(conf)    val sqlContext = new SQLContext(sc)//load读其他格式文件如json时,需要先用format指定格式    val peopleDF = sqlContext.read.format("json").load("hdfs://master:9000/student/2016113012/people.json")    peopleDF.select("name").write.format("parquet").save("hdfs://master:9000/sudent/2016113012/people_out1")      }}

提交集群运行

hadoop@master:~/wujiadong$ spark-submit --class wujiadong_sparkSQL.ManuallySpecifyOptions  --executor-memory 500m --total-executor-cores 2 /home/hadoop/wujiadong/wujiadong.spark.jar

查看是否运行成功

hadoop@master:~/wujiadong$ hadoop fs -ls hdfs://master:9000/sudent/2016113012/people_out117/02/03 12:24:27 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicableFound 4 items-rw-r--r--   3 hadoop supergroup          0 2017-02-03 12:22 hdfs://master:9000/sudent/2016113012/people_out1/_SUCCESS-rw-r--r--   3 hadoop supergroup        207 2017-02-03 12:22 hdfs://master:9000/sudent/2016113012/people_out1/_common_metadata-rw-r--r--   3 hadoop supergroup        327 2017-02-03 12:22 hdfs://master:9000/sudent/2016113012/people_out1/_metadata-rw-r--r--   3 hadoop supergroup        352 2017-02-03 12:22 hdfs://master:9000/sudent/2016113012/people_out1/part-r-00000-4d1a62a4-f550-4bde-899f-35e9aabfdc0c.gz.parquet

Save Mode

SaveMode.ErrorIfExists (默认):如果目标位置已经存在数据,那么抛出一个异常SaveMode.Append:如果目标位置已经存在数据,那么将数据追加进去SaveMode.Overwrite:如果目标位置已经存在数据,那么就将已经存在的数据删除,用新数据进行覆盖SaveMode.Ignore:如果目标位置已经存在数据,那么就忽略,不做任何操作

代码示例1

package wujiadong_sparkSQLimport org.apache.spark.sql.{SQLContext, SaveMode}import org.apache.spark.{SparkConf, SparkContext}/**  * Created by Administrator on 2017/2/3.  */object SaveModelTest {  def main(args: Array[String]): Unit = {    val conf = new SparkConf().setAppName("SaveModelTest")    val sc = new SparkContext(conf)    val sqlContext = new SQLContext(sc)    val peopleDF = sqlContext.read.format("json").load("hdfs://master:9000/student/2016113012/people.json")    peopleDF.save("hdfs://master:9000/student/2016113012/people.json",SaveMode.ErrorIfExists)  }}因为这种save mode文件已存在就报错
package wujiadong_sparkSQLimport org.apache.spark.sql.{SQLContext, SaveMode}import org.apache.spark.{SparkConf, SparkContext}/**  * Created by Administrator on 2017/2/3.  */object SaveModelTest {  def main(args: Array[String]): Unit = {    val conf = new SparkConf().setAppName("SaveModelTest")    val sc = new SparkContext(conf)    val sqlContext = new SQLContext(sc)    val peopleDF = sqlContext.read.format("json").load("hdfs://master:9000/student/2016113012/people.json")    peopleDF.save("hdfs://master:9000/student/2016113012/people.json",SaveMode.Overwrite)  }}这种会直接覆盖

转载于:https://www.cnblogs.com/wujiadong2014/p/6516558.html

你可能感兴趣的文章
CentOS7下yum安装Jenkins
查看>>
简练软考知识点整理-确认范围管理
查看>>
不懂这几点就落后了:Android、Python工程师必读!
查看>>
Werkzeug 教程
查看>>
内核参数优化
查看>>
用户,组和权限零碎知识
查看>>
计算机
查看>>
文件修改较优方式
查看>>
oracle导入导出exp,imp
查看>>
oracle check if the display variable is set
查看>>
一键部署Openstack R版
查看>>
《JAVA——帮你解决高并发秒杀》
查看>>
国家级期刊发表要求注意事项
查看>>
C文件操作
查看>>
观察转小写的操作-字符函数
查看>>
Oracle查询访问同一表的两个以上索引(二)
查看>>
office 2016 下载地址
查看>>
Go语言之调试
查看>>
Go语言之 unsafe 包之内存布局
查看>>
Spring Cloud Config 入门
查看>>