大数据开发工程师-第十一周 SparkSql-5


第十一周 SparkSql-极速上手SparkSql-5

1
之前已学完,spark core,离线数据计算

Spark SQL

1
2
3
4
5
6
7
8
9
10
Spark SQL和我们之前讲Hive的时候说的hive on spark是不一样的。
hive on spark是表示把底层的mapreduce引擎替换为spark引擎。
而Spark SQL是Spark自己实现的一套SQL处理引擎。

Spark SQL是Spark中的一个模块,主要用于进行结构化数据的处理。它提供的最核心的编程抽象,就是DataFrame。

DataFrame=RDD+Schema 。
它其实和关系型数据库中的表非常类似,RDD可以认为是表中的数据,Schema是表结构信息。
DataFrame可以通过很多来源进行构建,包括:结构化的数据文件,Hive中的表,外部的关系型数据库,以及RDD
Spark1.3出现的DataFrame ,Spark1.6出现了DataSet,在Spark2.0中两者统一,DataFrame等于DataSet[Row]

SparkSession

1
2
3
4
5
要使用Spark SQL,首先需要创建一个SpakSession对象
SparkSession中包含了SparkContext和SqlContext
所以说想通过SparkSession来操作RDD的话需要先通过它来获取SparkContext

这个SqlContext是使用sparkSQL操作hive的时候会用到的。

创建DataFrame

1
2
3
4
5
6
7
8
9
使用SparkSession,可以从RDD、HIve表或者其它数据源创建DataFrame
那下面我们来使用JSON文件来创建一个DataFrame
想要使用spark-sql需要先添加spark-sql的依赖

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.3</version>
</dependency>
1
在项目中添加sql这个包名

image-20230328153653400

scala

image-20230328154207572

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package com.imooc.scala.sql
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
/**
* 需求:使用json文件创建DataFrame
* Created by xuwei
*/
object SqlDemoScala{
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setMaster("local")
//创建SparkSession对象,里面包含SparkContext和SqlContext
val sparkSession = SparkSession.builder()
.appName("SqlDemoScala")
.config(conf)
.getOrCreate()
//读取json文件,获取DataFrame
val stuDf = sparkSession.read.json("D:\\student.json")
//查看DataFrame中的数据
stuDf.show()
sparkSession.stop()
}
}

image-20230328154309744

java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package com.imooc.java.sql;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
/**
* 需求:使用json文件创建DataFrame
* Created by xuwei
*/
public class SqlDemoJava {
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.setMaster("local");
//创建SparkSession对象,里面包含SparkContext和SqlContext
SparkSession sparkSession = SparkSession.builder()
.appName("SqlDemoJava")
.config(conf)
.getOrCreate();
//读取json文件,获取Dataset<Row>
Dataset<Row> stuDf = sparkSession.read().json("D:\\student.json"); // 返回的dataset其实和dataframe一样的,新版本合并了
stuDf.show();
sparkSession.stop()
}
}

image-20230328154617041

1
2
由于DataFrame等于DataSet[Row],它们两个可以互相转换,所以创建哪个都是一样的
咱们前面的scala代码默认创建的是DataFrame,java代码默认创建的是DataSet
1
2
3
4
5
6
7
8
9
尝试对他们进行转换
在Scala代码中将DataFrame转换为DataSet[Row],对后面的操作没有影响
//将DataFrame转换为DataSet[Row]
val stuDf = sparkSession.read.json("D:\\student.json").as("stu")


在Java代码中将DataSet[Row]转换为DataFrame
//将Dataset<Row>转换为DataFrame
Dataset<Row> stuDf = sparkSession.read().json("D:\\student.json").toDF();

DataFrame常见算子操作

1
2
下面来看一下Spark sql中针对DataFrame常见的算子操作
先看一下官方文档

image-20230328155250497

1
2
3
4
5
6
printSchema()
show()
select()
filter()、where()
groupBy()
count()

scala

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package com.imooc.scala.sql
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
/**
* 需求:DataFrame常见操作
* Created by xuwei
*/
object DataFrameOpScala {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setMaster("local")
//创建SparkSession对象,里面包含SparkContext和SqlContext
val sparkSession = SparkSession.builder()
.appName("DataFrameOpScala")
.config(conf)
.getOrCreate()
val stuDf = sparkSession.read.json("D:\\student.json")

//打印schema信息
stuDf.printSchema()

//默认显示所有数据,可以通过参数控制显示多少条
stuDf.show(2)

//查询数据中的指定字段信息
stuDf.select("name","age").show()

//在使用select的时候可以对数据做一些操作,需要添加隐式转换函数,否则语法报错
import sparkSession.implicits._
stuDf.select($"name",$"age" + 1).show()

//对数据进行过滤,需要添加隐式转换函数,否则语法报错
stuDf.filter($"age">18).show()

//where底层调用的就是filter
stuDf.where($"age">18).show()

//对数据进行分组求和
stuDf.groupBy("age").count().show()
sparkSession.stop()
}
}

image-20230328155651098

image-20230328155732191

image-20230328155839255

image-20230328160101149

image-20230328160159987

image-20230328160345238

java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package com.imooc.java.sql;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import static org.apache.spark.sql.functions.col;
/**
* 需求:DataFrame常见操作
* Created by xuwei
*/
public class DataFrameOpJava {
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.setMaster("local");
//创建SparkSession对象,里面包含SparkContext和SqlContext
SparkSession sparkSession = SparkSession.builder()
.appName("DataFrameOpJava")
.config(conf)
.getOrCreate();
Dataset<Row> stuDf = sparkSession.read().json("D:\\student.json");

//打印schema信息
stuDf.printSchema();

//默认显示所有数据,可以通过参数控制显示多少条
stuDf.show(2);

//查询数据中的指定字段信息
stuDf.select("name","age").show();

//在select的时候可以对数据做一些操作,需要引入import static org.apache.spa
stuDf.select(col("name"),col("age").plus(1)).show();

//对数据进行过滤
stuDf.filter(col("age").gt(18)).show();
stuDf.where(col("age").gt(18)).show();

//对数据进行分组求和
stuDf.groupBy("age").count().show();
sparkSession.stop();
}
}
1
2
这些就是针对DataFrame的一些常见的操作。
但是现在这种方式其实用起来还是不方便,只是提供了一些类似于可以操作表的算子,很对一些简单的查询还是可以的,但是针对一些复杂的操作,使用算子写起来就很麻烦了,所以我们希望能够直接支持用sql的方式执行,Spark SQL也是支持的

DataFrame的sql操作

1
2
3
4
想要实现直接支持sql语句查询DataFrame中的数据
需要两步操作
1. 先将DataFrame注册为一个临时表
2. 使用sparkSession中的sql函数执行sql语句

scala

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package com.imooc.scala.sql
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
/**
* 需求:使用sql操作DataFrame
* Created by xuwei
*/
object DataFrameSqlScala {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setMaster("local")
//创建SparkSession对象,里面包含SparkContext和SqlContext
val sparkSession = SparkSession.builder()
.appName("DataFrameSqlScala")
.config(conf)
.getOrCreate()
val stuDf = sparkSession.read.json("D:\\student.json")

//将DataFrame注册为一个临时表
stuDf.createOrReplaceTempView("student")
//使用sql查询临时表中的数据
sparkSession.sql("select age,count(*) as num from student group by age")
.show()
sparkSession.stop()
}
}

image-20230328232442914

java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package com.imooc.java.sql;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
/**
* 需求:使用sql操作DataFrame
* Created by xuwei
*/
public class DataFrameSqlJava {
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.setMaster("local");
//创建SparkSession对象,里面包含SparkContext和SqlContext
SparkSession sparkSession = SparkSession.builder()
.appName("DataFrameSqlJava")
.config(conf)
.getOrCreate();
Dataset<Row> stuDf = sparkSession.read().json("D:\\student.json");
//将Dataset<Row>注册为一个临时表
stuDf.createOrReplaceTempView("student");
//使用sql查询临时表中的数据
sparkSession.sql("select age,count(*) as num from student group by ag
.show();
sparkSession.stop();
}
}

RDD转换为DataFrame(包含DataFrame转RDD)

1
2
3
4
5
6
7
8
为什么要将RDD转换为DataFrame?

在实际工作中我们可能会先把hdfs上的一些日志数据加载进来,然后进行一些处理,最终变成结构化的数据,希望对这些数据做一些统计分析,当然了我们可以使用spark中提供的transformation算子来实现,只不过会有一些麻烦,毕竟是需要写代码的,如果能够使用sql实现,其实是更加方便的。

所以可以针对我们前面创建的RDD,将它转换为DataFrame,这样就可以使用dataFrame中的一些算子或者直接写sql来操作数据了。
Spark SQL支持这两种方式将RDD转换为DataFrame
1. 反射方式
2. 编程方式

反射方式

1
2
3
4
5
6
7
下面来看一下反射方式:
这种方式是使用反射来推断RDD中的元数据。
基于反射的方式,代码比较简洁,也就是说当你在写代码的时候,已经知道了RDD中的元数据(已经知道RDD里面的数据长什么样子),这样的话使用反射这种方式是一种非常不错的选择。
Scala具有隐式转换的特性,所以spark sql的scala接口是支持自动将包含了case class的RDD转换为DataFrame的

下面来举一个例子
scala代码如下:
scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package com.imooc.scala.sql
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
/**
* 需求:使用反射方式实现RDD转换为DataFrame
* Created by xuwei
*/
object RddToDataFrameByReflectScala {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setMaster("local")
//创建SparkSession对象,里面包含SparkContext和SqlContext
val sparkSession = SparkSession.builder()
.appName("RddToDataFrameByReflectScala")
.config(conf)
.getOrCreate()

//获取SparkContext
val sc = sparkSession.sparkContext
val dataRDD = sc.parallelize(Array(("jack",18),("tom",20),("jessic",30)))

//基于反射直接将包含Student对象的dataRDD转换为DataFrame
//需要导入隐式转换
import sparkSession.implicits._ //不导入,toDF()不能用
val stuDf = dataRDD.map(tup=>Student(tup._1,tup._2)).toDF()

//下面就可以通过DataFrame的方式操作dataRDD中的数据了
stuDf.createOrReplaceTempView("student")
//执行sql查询
val resDf = sparkSession.sql("select name,age from student where age > 18")
//将DataFrame转化为RDD,或者直接在这里show()也行
val resRDD = resDf.rdd

//从row中取数据 ,封装成student,打印到控制台
resRDD.map(row=>Student(row(0).toString,row(1).toString.toInt))
.collect()
.foreach(println(_))
//使用row的getAs()方法,获取指定列名的值
resRDD.map(row=>Student(row.getAs[String]("name"),row.getAs[Int]("age")))
.collect()
.foreach(println(_))
sparkSession.stop()
}
}
//定义一个Student
case class Student(name: String,age: Int)

image-20230328234459220

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package com.imooc.java.sql;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import scala.Tuple2;
import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
/**
* 需求:使用反射方式实现RDD转换为DataFrame
* Created by xuwei
*/
public class RddToDataFrameByReflectJava {
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.setMaster("local");
//创建SparkSession对象,里面包含SparkContext和SqlContext
SparkSession sparkSession = SparkSession.builder()
.appName("RddToDataFrameByReflectJava")
.config(conf)
.getOrCreate();
//获取SparkContext
//从sparkSession中获取的是scala中的sparkContext,所以需要转换成java中的sparkContext
JavaSparkContext sc = JavaSparkContext.fromSparkContext(sparkSession.sparkContext());
Tuple2<String, Integer> t1 = new Tuple2<String, Integer>("jack", 18);
Tuple2<String, Integer> t2 = new Tuple2<String, Integer>("tom", 20);
Tuple2<String, Integer> t3 = new Tuple2<String, Integer>("jessic", 30);
JavaRDD<Tuple2<String, Integer>> dataRDD = sc.parallelize(Arrays.asList(t1,t2,t3));
JavaRDD<Student> stuRDD = dataRDD.map(new Function<Tuple2<String, Integer>, Student>(){
@Override
public Student call(Tuple2<String, Integer> tup) throws Exception
return new Student(tup._1, tup._2);
}
});
//注意:Student这个类必须声明为public(一个文件里只能有一个public class,所以只能在另一个文件里创建),并且必须实现序列化
Dataset<Row> stuDf = sparkSession.createDataFrame(stuRDD, Student.class);

stuDf.createOrReplaceTempView("student");
//执行sql查询
Dataset<Row> resDf = sparkSession.sql("select name,age from student where age>18");
//将DataFrame转化为RDD,注意:这里需要转为JavaRDD
JavaRDD<Row> resRDD = resDf.javaRDD();
//从row中取数据,封装成student,打印到控制台
List<Student> resList = resRDD.map(new Function<Row, Student>() {
@Override
public Student call(Row row) throws Exception {
//return new Student(row.getString(0), row.getInt(1));
//通过getAs获取数据
return new Student(row.getAs("name").toString(), Integer.parseInt(row.getAs("age").toString());
}
}).collect();
for(Student stu : resList){
System.out.println(stu);
sparkSession.stop();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package com.imooc.java.sql;
import java.io.Serializable;

public class student implements Serializable{
private String name;
private int age;

public Student(String name,int age){
this.name=name;
this.age=age;
}
public String getName(){
return name;
}
public void setName(String name){
this.name=name;
}
public void getAge(){
return age;
}
public void setAge(){
this.age=age;
}
@Override
public String toString(){
return "Student{"+
"name='"+name+'\''+
",age="+age+
'}';
}

}
1
2
3
java类中快速为字段生成get,set方法,右键->generate->...
快速生成构造方法 右键->generate->constructor
快速生成toString()方法 右键->generate->toString

编程方式

1
2
3
4
接下来是编程的方式
这种方式是通过编程接口来创建DataFrame,你可以在程序运行时动态构建一份元数据,就是Schema,然后将其应用到已经存在的RDD上。这种方式的代码比较冗长,但是如果在编写程序时,还不知道RDD的元数据,只有在程序运行时,才能动态得知其元数据,那么只能通过这种动态构建元数据的方式。

也就是说当case calss中的字段无法预先定义的时候,就只能用编程方式动态指定元数据了
scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package com.imooc.scala.sql
import org.apache.spark.SparkConf
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, Stru
import org.apache.spark.sql.{Row, SparkSession}
/**
* 需求:使用编程方式实现RDD转换为DataFrame
* Created by xuwei
*/
object RddToDataFrameByProgramScala {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setMaster("local")
//创建SparkSession对象,里面包含SparkContext和SqlContext
val sparkSession = SparkSession.builder()
.appName("RddToDataFrameByProgramScala")
.config(conf)
.getOrCreate()
//获取SparkContext
val sc = sparkSession.sparkContext
// 假设这里不知道RDD的数据结构
val dataRDD = sc.parallelize(Array(("jack",18),("tom",20),("jessic",30)))

//组装rowRDD
val rowRDD = dataRDD.map(tup=>Row(tup._1,tup._2))
// dataframe=rdd+schema
//指定元数据信息【这个元数据信息就可以动态从外部获取了,比较灵活】
val schema = StructType(Array(
StructField("name",StringType,true),
StructField("age",IntegerType,true)
))

//组装DataFrame
val stuDf = sparkSession.createDataFrame(rowRDD,schema)
//下面就可以通过DataFrame的方式操作dataRDD中的数据了
stuDf.createOrReplaceTempView("student")

//执行sql查询
val resDf = sparkSession.sql("select name,age from student where age > 18")

//将DataFrame转化为RDD
val resRDD = resDf.rdd
resRDD.map(row=>(row(0).toString,row(1).toString.toInt))
.collect()
.foreach(println(_))
sparkSession.stop()
}
}
1
IDEA编程时使用一个未知的东西,alt+回车,会提示需要导入的东西

image-20230329004355828

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package com.imooc.java.sql;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
* 需求:使用编程方式实现RDD转换为DataFrame
* Created by xuwei
*/
public class RddToDataFrameByProgramJava {
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.setMaster("local");
//创建SparkSession对象,里面包含SparkContext和SqlContext
SparkSession sparkSession = SparkSession.builder()
.appName("RddToDataFrameByProgramJava")
.config(conf)
.getOrCreate();
//获取SparkContext
//从sparkSession中获取的是scala中的sparkContext,所以需要转换成java中的sp
JavaSparkContext sc = JavaSparkContext.fromSparkContext(sparkSession.
Tuple2<String, Integer> t1 = new Tuple2<String, Integer>("jack", 18);
Tuple2<String, Integer> t2 = new Tuple2<String, Integer>("tom", 20);
Tuple2<String, Integer> t3 = new Tuple2<String, Integer>("jessic", 30);
JavaRDD<Tuple2<String, Integer>> dataRDD = sc.parallelize(Arrays.asList(t1,t2,t3));
//组装rowRDD
JavaRDD<Row> rowRDD = dataRDD.map(new Function<Tuple2<String, Integer>,Row>(){
@Override
public Row call(Tuple2<String, Integer> tup) throws Exception {
return RowFactory.create(tup._1, tup._2);
}
});
//指定元数据信息
ArrayList<StructField> structFieldList = new ArrayList<StructField>();
structFieldList.add(DataTypes.createStructField("name", DataTypes.StringType,true));
structFieldList.add(DataTypes.createStructField("age", DataTypes.IntegerType,True));
StructType schema = DataTypes.createStructType(structFieldList);
//构建DataFrame
Dataset<Row> stuDf = sparkSession.createDataFrame(rowRDD, schema);

stuDf.createOrReplaceTempView("student");
//执行sql查询
Dataset<Row> resDf = sparkSession.sql("select name,age from student where age>18");
//将DataFrame转化为RDD,注意:这里需要转为JavaRDD
JavaRDD<Row> resRDD = resDf.javaRDD();
List<Tuple2<String, Integer>> resList = resRDD.map(new Function<Row, Tuple2<String,Intege>>(){
@Override
public Tuple2<String, Integer> call(Row row) throws Exception {
return new Tuple2<String, Integer>(row.getString(0), row.getInt(1));
}
}).collect();

for(Tuple2<String,Integer> tup : resList){
System.out.println(tup);
}
sparkSession.stop();
}
}

load和save操作

1
2
3
4
5
6
7
8
对于Spark SQL的DataFrame来说,无论是从什么数据源创建出来的DataFrame,都有一些共同的load和save操作。(不使用这两个方法,使用TextFile(),再转成DataFrame操作,再转RDD,saveAsText()也可以)

load操作主要用于加载数据,创建出DataFrame;
save操作,主要用于将DataFrame中的数据保存到文件中。

我们前面操作json格式的数据的时候好像没有使用load方法,而是直接使用的json方法,这是什么特殊用法吗?
查看json方法的源码会发现,它底层调用的是format和load方法
def json(paths: String*): DataFrame = format("json").load(paths : _*)
1
2
3
4
我们如果使用原始的format和load方法加载数据,
此时如果不指定format,则默认读取的数据源格式是parquet,也可以手动指定数据源格式。Spark SQL内置了一些常见的数据源类型,比如json, parquet, jdbc, orc, csv, text

通过这个功能,就可以在不同类型的数据源之间进行转换了。

scala

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package com.imooc.scala.sql
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
/**
* 需求:load和save的使用
* Created by xuwei
*/
object LoadAndSaveOpScala {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setMaster("local")
//创建SparkSession对象,里面包含SparkContext和SqlContext
val sparkSession = SparkSession.builder()
.appName("LoadAndSaveOpScala")
.config(conf)
.getOrCreate()
//读取数据(这种方式与前面讲的创建dataframe是一样的,sparkSession.read.json("xxx"))
val stuDf = sparkSession.read
.format("json")
.load("D:\\student.json")
//保存数据
stuDf.select("name","age")
.write
.format("csv")
.save("hdfs://bigdata01:9000/out-save001")
sparkSession.stop()
}
}

image-20230329134033719

image-20230329134053438

java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package com.imooc.java.sql;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
/**
* 需求:load和save的使用
* Created by xuwei
*/
public class LoadAndSaveOpJava {
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.setMaster("local");
//创建SparkSession对象,里面包含SparkContext和SqlContext
SparkSession sparkSession = SparkSession.builder()
.appName("LoadAndSaveOpJava")
.config(conf)
.getOrCreate();
//读取数据
Dataset<Row> stuDf = sparkSession.read()
.format("json")
.load("D:\\student.json");
//保存数据
stuDf.select("name","age")
.write()
.format("csv")
.save("hdfs://bigdata01:9000/out-save002");
sparkSession.stop();
}
}

SaveMode

1
2
3
4
5
6
7
8
Spark SQL对于save操作,提供了不同的save mode。
主要用来处理,当目标位置已经有数据时应该如何处理。save操作不会执行锁操作,并且也不是原子的,因此是有一定风险出现脏数据的。

SaveMode 解释
SaveMode.ErrorIfExists (默认) 如果目标位置已经存在数据,那么抛出一个异常
SaveMode.Append 如果目标位置已经存在数据,那么将数据追加进去
SaveMode.Overwrite 如果目标位置已经存在数据,那么就将已经存在的数据删除,用新数据进行覆盖
SaveMode.Ignore 如果目标位置已经存在数据,那么就忽略,不做任何操作
1
2
3
4
5
6
7
在LoadAndSaveOpScala中增加SaveMode的设置,重新执行,验证结果
将SaveMode设置为Append,如果目标已存在,则追加
stuDf.select("name","age")
.write
.format("csv")
.mode(SaveMode.Append)//追加
.save("hdfs://bigdata01:9000/out-save001")

image-20230329135505416

作业

image-20230329135637838

内置函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Spark中提供了很多内置的函数
种类 函数
聚合函数 avg, count, countDistinct, first, last, max, mean, min, sum,

集合函数 array_contains, explode, size

日期/时间函数 datediff, date_add, date_sub, add_months, last_day, next_day,

数学函数 abs, ceil, floor, round

混合函数 if, isnull, md5, not, rand, when

字符串函数 concat, get_json_object, length, reverse, split, upper

窗口函数 denseRank, rank, rowNumber
1
2
其实这里面的函数和hive中的函数是类似的
注意:SparkSQL中的SQL函数文档不全,其实在使用这些函数的时候,大家完全可以去查看hive中sql的文档,使用的时候都是一样的。

实战:TopN主播统计

1
2
3
4
5
6

在前面讲Spark core的时候我们讲过一个案例,TopN主播统计,计算每个大区当天金币收入TopN的主播,之前我们使用spark中的transformation算子去计算,实现起来还是比较麻烦的,代码量相对来说比较多,下面我们就使用咱们刚学习的Spark sql去实现一下,你会发现,使用sql之后确实简单多了。

回顾以下我们的两份原始数据,数据都是json格式的
video_info.log 主播的开播记录,其中包含主播的id:uid、直播间id:vid 、大区:area、视频开播时长:length、增加粉丝数量:follow等信息
gift_record.log 用户送礼记录,其中包含送礼人id:uid,直播间id:vid,礼物id:good_id,金币数量:gold 等信息
1
2
最终需要的结果是这样的
US 8407173251015:180,8407173251012:70,8407173251001:60
1
2
3
4
5
分析一下具体步骤
1. 直接使用SparkSession中的load方式加载json的数据
2. 对这两份数据注册临时表
3. 执行sql计算TopN主播
4. 使用foreach将结果打印到控制台

scala

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package com.imooc.scala.sql
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
/**
* 需求:计算TopN主播
* 1:直接使用sparkSession中的load方式加载json数据
* 2:对这两份数据注册临时表
* 3:执行sql计算TopN主播
* 4:使用foreach将结果打印到控制台
* Created by xuwei
*/
object TopNAnchorScala {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setMaster("local")
//创建SparkSession对象,里面包含SparkContext和SqlContext
val sparkSession = SparkSession.builder()
.appName("LoadAndSaveOpScala")
.config(conf)
.getOrCreate()

//1:直接使用sparkSession中的load方式加载json数据
val videoInfoDf = sparkSession.read.json("D:\\video_info.log")
val giftRecordDf = sparkSession.read.json("D:\\gift_record.log")

//2:对这两份数据注册临时表
videoInfoDf.createOrReplaceTempView("video_info")
giftRecordDf.createOrReplaceTempView("gift_record")

//3:执行sql计算TopN主播
val sql ="select "+
"t4.area, "+
"concat_ws(',',collect_list(t4.topn)) as topn_list "+
"from( "+
"select "+
"t3.area,concat(t3.uid,':',cast(t3.gold_sum_all as int)) as topn "+
"from( "+
"select "+
"t2.uid,t2.area,t2.gold_sum_all,row_number() over (partition by area orde
"from( "+
"select "+
"t1.uid,max(t1.area) as area,sum(t1.gold_sum) as gold_sum_all "+
"from( "+
"select "+
"vi.uid,vi.vid,vi.area,gr.gold_sum "+
"from "+
"video_info as vi "+
"join "+
"(select "+
"vid,sum(gold) as gold_sum "+
"from "+
"gift_record "+
"group by vid "+
")as gr "+
"on vi.vid = gr.vid "+
") as t1 "+
"group by t1.uid "+
") as t2 "+
")as t3 "+
"where t3.num <=3 "+
") as t4 "+
"group by t4.area "

val resDf = sparkSession.sql(sql)

//4:使用foreach将结果打印到控制台
resDf.rdd.foreach(row=>println(row.getAs[String]("area")+"\t"+row.getAs[String]))
sparkSession.stop()
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
select
t4.area,
concat_ws(',',collect_list(t4.topn)) as topn_list
from(
select
t3.area,concat(t3.uid,':',cast(t3.gold_sum_all as int)) as topn // cast() as int 是因为结果中有xxx.0
from(
select
t2.uid,t2.area,t2.gold_sum_all,row_number() over (partition by area order by gold_sum_all desc) as num
from(
select
t1.uid,max(t1.area) as area,sum(t1.gold_sum) as gold_sum_all
from(
select
vi.uid,vi.vid,vi.area,gr.gold_sum
from
video_info as vi
join
(select
vid,sum(gold) as gold_sum
from
gift_record
group by vid
)as gr
on vi.vid = gr.vid
) as t1
group by t1.uid
) as t2
)as t3
where t3.num <=3
) as t4
group by t4.area
1
t3再select的结果

image-20230330150555300

image-20230330150618987

1
t4的prinln(_)是这样时

image-20230330151021942

1
最终结果

image-20230330151135975

java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
package com.imooc.java.sql;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
/**
* 需求:计算TopN主播
* 1:直接使用sparkSession中的load方式加载json数据
* 2:对这两份数据注册临时表
* 3:执行sql计算TopN主播
* 4:使用foreach将结果打印到控制台
* Created by xuwei
*/
public class TopNAnchorJava {
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.setMaster("local");
//创建SparkSession对象,里面包含SparkContext和SqlContext
SparkSession sparkSession = SparkSession.builder()
.appName("TopNAnchorJava")
.config(conf)
.getOrCreate();

//1:直接使用sparkSession中的load方式加载json数据
Dataset<Row> videoInfoDf = sparkSession.read().json("D:\\video_info.log");
Dataset<Row> giftRecordDf = sparkSession.read().json("D:\\gift_record.log");
//2:对这两份数据注册临时表
videoInfoDf.createOrReplaceTempView("video_info");
giftRecordDf.createOrReplaceTempView("gift_record");
//3:执行sql计算TopN主播
String sql = "select "+
"t4.area, "+
"concat_ws(',',collect_list(t4.topn)) as topn_list "+
"from( "+
"select "+
"t3.area,concat(t3.uid,':',cast(t3.gold_sum_all as int)) as t
"from( "+
"select "+
"t2.uid,t2.area,t2.gold_sum_all,row_number() over (partition
"from( "+
"select "+
"t1.uid,max(t1.area) as area,sum(t1.gold_sum) as gold_sum_all
"from( "+
"select "+
"vi.uid,vi.vid,vi.area,gr.gold_sum "+
"from "+
"video_info as vi "+
"join "+
"(select "+
"vid,sum(gold) as gold_sum "+
"from "+
"gift_record "+
"group by vid "+
")as gr "+
"on vi.vid = gr.vid "+
") as t1 "+
"group by t1.uid "+
") as t2 "+
")as t3 "+
"where t3.num <=3 "+
") as t4 "+
"group by t4.area ";
Dataset<Row> resDf = sparkSession.sql(sql);
//4:使用foreach将结果打印到控制台
resDf.javaRDD().foreach(new VoidFunction<Row>() {
@Override
public void call(Row row) throws Exception {
System.out.println(row.getString(0)+"\t"+row.getString(1));
}
});
sparkSession.stop();
}
}
1
2
3
4
代码执行结果如下:
CN 8407173251008:120,8407173251003:60,8407173251014:50
ID 8407173251005:160,8407173251010:140,8407173251002:70
US 8407173251015:180,8407173251012:70,8407173251001:60

集群执行

1
新建一个object:TopNAnchorClusterScala,修改代码,将任务的输出数据保存到hdfs上面
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package com.imooc.scala.sql
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
/**
* 需求:计算TopN主播
* 1:直接使用sparkSession中的load方式加载json数据
* 2:对这两份数据注册临时表
* 3:执行sql计算TopN主播
* 4:使用foreach将结果打印到控制台
* Created by xuwei
*/
object TopNAnchorClusterScala {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
//创建SparkSession对象,里面包含SparkContext和SqlContext
val sparkSession = SparkSession.builder()
.appName("LoadAndSaveOpScala")
.config(conf)
.getOrCreate()
//1:直接使用sparkSession中的load方式加载json数据
val videoInfoDf = sparkSession.read.json("hdfs://bigdata01:9000/video_inf
val giftRecordDf = sparkSession.read.json("hdfs://bigdata01:9000/gift_rec
//2:对这两份数据注册临时表
videoInfoDf.createOrReplaceTempView("video_info")
giftRecordDf.createOrReplaceTempView("gift_record")
//3:执行sql计算TopN主播
val sql ="select "+
"t4.area, "+
"concat_ws(',',collect_list(t4.topn)) as topn_list "+
"from( "+
"select "+
"t3.area,concat(t3.uid,':',cast(t3.gold_sum_all as int)) as topn "+
"from( "+
"select "+
"t2.uid,t2.area,t2.gold_sum_all,row_number() over (partition by area orde
"from( "+
"select "+
"t1.uid,max(t1.area) as area,sum(t1.gold_sum) as gold_sum_all "+
"from( "+
"select "+
"vi.uid,vi.vid,vi.area,gr.gold_sum "+
"from "+
"video_info as vi "+
"join "+
"(select "+
"vid,sum(gold) as gold_sum "+
"from "+
"gift_record "+
"group by vid "+
")as gr "+
"on vi.vid = gr.vid "+
") as t1 "+
"group by t1.uid "+
") as t2 "+
")as t3 "+
"where t3.num <=3 "+
") as t4 "+
"group by t4.area "
val resDf = sparkSession.sql(sql)
//4:使用foreach将结果打印到控制台
resDf.rdd
.map(row=>row.getAs[String]("area")+"\t"+row.getAs[String]("topn_list")
.saveAsTextFile("hdfs://bigdata01:9000/out-topn")
sparkSession.stop()
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
修改pom中依赖的配置,全部设置为provided

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.3</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.68</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.3</version>
<scope>provided</scope>
</dependency>
</dependencies>
1
2
3
4
5
针对spark-core和spark-sql在打包的时候是不需要的,针对fastjson有的spark job是需要的,不过建议
在这设置为provided,打包的时候不要打进去,在具体使用的时候可以在spark-submit脚本中通过–jar
来动态指定这个jar包,最好把这个jar包上传到hdfs上面统一管理和维护。
编译打包,上传到bigdta04上的 /data/soft/sparkjars 目录
创建spark-submit脚本
1
2
3
4
5
6
7
8
9
10
[root@bigdata04 sparkjars]# vi topnJob.sh 
spark-submit \
--class com.imooc.scala.sql.TopNAnchorClusterScala \
--master yarn \
--deploy-mode cluster \
--executor-memory 1g \
--num-executors 5 \
--executor-cores 2 \
--conf "spark.default.parallelism=10" \
db_spark-1.0-SNAPSHOT-jar-with-dependencies.jar
1
2
提交任务
[root@bigdata04 sparkjars]# sh -x topnJob.sh

image-20230329155709778


本文标题:大数据开发工程师-第十一周 SparkSql-5

文章作者:TTYONG

发布时间:2023年03月28日 - 15:03

最后更新:2023年04月23日 - 22:04

原始链接:http://tianyong.fun/%E5%A4%A7%E6%95%B0%E6%8D%AE%E5%BC%80%E5%8F%91%E5%B7%A5%E7%A8%8B%E5%B8%88-%E7%AC%AC%E5%8D%81%E4%B8%80%E5%91%A8-SparkSql-5.html

许可协议: 转载请保留原文链接及作者。

多少都是爱
0%