SparkSQL 程序运行流程
DataSource 数据源操作
Sparksql 提供了丰富的数据源操作,可以方便的导入各种各样的数据包括:json,text,orc,parquet,csv,jdbc,hive,libsvm
///\构建 sparksession
SparkConf sconf = new SparkConf().setAppName("sparktest").setMaster("local[*]"); SparkSession spark = SparkSession.builder().config(sconf).getOrCreate();
1、 默认数据源:parquet
,直接采用 load()
加载
Dataset parquet_users = spark.read().load("data/resources/users.parquet");
2、 指定格式 json,orc,text,parquet,csv
加载-保存:
Spark SQL 可以自动的识别 json 数据集的 schema 并且加载为 DataFrame。通过 SQLContext.read.json()来转换字符串类型 RDD 或 json 文件。这里的 json 不是传统的 json 文件内容,每一行要包含分隔,独立有效 json 对象。因此常规的多行 json 文件通常会失败。
可以使用toDF("列名1","列名2","列名3"...)为导入的数据指定列名,否则会使用默认列名,列的个数和列名的个数必须相同
//json spark.read().json("data/resources/people.json") //加载 .write().json("path"); //保存 //csv spark.read().csv("") .write().csv("path"); //parquet spark.read().parquet("") .write().parquet("path"); //orc spark.read().orc("") .write().orc("path"); //text spark.read().text("data/resources/people.txt") .write().text("path");
3、 JDBC 数据源加载-保存
//jdbc_1:第一种加载保存方式 Dataset jdbcDF = spark.read() .format("jdbc") .option("url", "jdbc:mysql://ip:3306/testDB") .option("driver", "com.mysql.jdbc.driver") .option("dbtable", "tablename") .option("user", "username") .option("password", "password") .load(); jdbcDF.select("column") .as(Encoders.STRING()) .write() .mode(SaveMode.Overwrite) .format("jdbc") .option("url", "jdbc:mysql://ip:3306/testDB") .option("driver", "com.mysql.jdbc.driver") .option("dbtable", "tablename") .option("user", "username") .option("password", "password") .save(); //jdbc_2:第二种加载保存方式 Properties connectionProperties = new Properties(); connectionProperties.put("user", "username"); connectionProperties.put("password", "password"); Dataset jdbcDF2 = spark.read() .jdbc("jdbc:mysql://ip:3306/testDB", "tablename", connectionProperties); jdbcDF2.select("column") .sort() .as(Encoders.STRING()) .write() .jdbc("jdbc:mysql://ip:3306/testDB", "tablename", connectionProperties);
4、 Hive 表数据加载:
.config("spark.sql.warehouse.dir", warehouseLocation) .enableHiveSupport()
开启以上两个配置就可以操作 Hive 表了,(前提是,sqpark-sql 命令行能够操作 hive 表,必须将 hive 的 hive-site.xml 配置文件放到 spark 的 conf 目录下)
String warehouseLocation = new File("spark-warehouse").getAbsolutePath(); SparkSession spark = SparkSession .builder() .appName("Java Spark Hive Example") .config("spark.sql.warehouse.dir", warehouseLocation) .enableHiveSupport() .getOrCreate(); spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive"); spark.sql("LOAD DATA LOCAL INPATH 'resources/kv1.txt' INTO TABLE src"); //可以直接查询数据,过滤,聚合使用sql中的聚合函数 //也可以将数据查询出来转换成DS/DF,使用DSL,
SparkSQL 数据操作:
SparkSQl 中提供了两种操作数据的方式,SQL 和 DSL,SQL 操作大家很熟悉,也很方便,提供 DSL 是为了弥补 SQL 操作功能或者性能上的不足,提供了更加强大操作组合,但是他们的底层都是一样的。
一、DSL 操作数据
//people.json {"name":"Michael"} {"name":"Andy", "age":30} {"name":"Justin", "age":19} {"name":"Michael"} {"name":"Andy", "age":30} {"name":"Justin", "age":19}
1、加载数据
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sqlVsDsl"); SparkSession spark = SparkSession.builder().config(conf).getOrCreate(); //加载数据构建DF Dataset peoplesDF = spark.read().format("json").load("data/resources/people.json");
2、输出 Schema
peoplesDF.printSchema(); //自动识别了Json格式数据的Schema root |-- age: long (nullable = true) |-- name: string (nullable = true)
3、查询某列:select
peoplesDF.select("name","age").show(); +-------+----+ | name| age| +-------+----+ |Michael|null| | Andy| 30| | Justin| 19| |Michael|null| | Andy| 30| | Justin| 19|
4、条件过滤:filter
使用col()要导入:import static org.apache.spark.sql.functions.col;
peoplesDF.filter("age is NULL OR age>20").show(); // /* +----+-------+ | age| name| +----+-------+ |null|Michael| | 30| Andy| |null|Michael| | 30| Andy| +----+-------+ */ peoplesDF.filter("age <20").show(); //peoplesDF.col("age").$less(20) +---+------+ |age| name| +---+------+ | 19|Justin| | 19|Justin| +---+------+ //可以看出比较大小,null值并不参与 peoplesDF.filter(peoplesDF.col("age").$eq$eq$eq(30)).show(); +---+----+ |age|name| +---+----+ | 30|Andy| | 30|Andy| +---+----+
5、去重复项:distinct
peoplesDF.distinct().show(); +----+-------+ | age| name| +----+-------+ |null|Michael| | 30| Andy| | 19| Justin| +----+-------+
6、按列排序:orderby/sort
peoplesDF.orderBy("age").show(); +----+-------+ | age| name| +----+-------+ |null|Michael| |null|Michael| | 19| Justin| | 19| Justin| | 30| Andy| | 30| Andy| +----+-------+ peoplesDF.sort("name","age").show(); +----+-------+ | age| name| +----+-------+ | 30| Andy| | 30| Andy| | 19| Justin| | 19| Justin| |null|Michael| |null|Michael| +----+-------+
7、分组聚合:groupBy().sum()/max/min..
//按name分组,求每组的age之和 peoplesDF.groupBy("name").sum("age").show(); +-------+--------+ | name|sum(age)| +-------+--------+ |Michael| null| | Andy| 60| | Justin| 38| +-------+--------+
7、分组聚合:groupBy().agg()
peoplesDF.groupBy("name").agg(col("name"),max("age"),sum("age")).show(); +-------+-------+--------+--------+ | name| name|max(age)|sum(age)| +-------+-------+--------+--------+ |Michael|Michael| null| null| | Andy| Andy| 30| 60| | Justin| Justin| 19| 38| +-------+-------+--------+--------+
8、对某列进行操作:col()
peoplesDF.select(peoplesDF.col("name"),peoplesDF.col("age").$plus(1)).show(); +-------+---------+ | name|(age + 1)| +-------+---------+ |Michael| null| | Andy| 31| | Justin| 20| |Michael| null| | Andy| 31| | Justin| 20| +-------+---------+ peoplesDF.select(peoplesDF.col("name"),peoplesDF.col("age").$less(20)).show(); +-------+----------+ | name|(age < 20)| +-------+----------+ |Michael| null| | Andy| false| | Justin| true| |Michael| null| | Andy| false| | Justin| true| +-------+----------+
9、连接 Join:join()
Default
inner
. Must be one of:
inner
,cross
,outer
,full
,full_outer
,left
,left_outer
,right
,right_outer
,left_semi
,left_anti
.
peoplesDF.join(peoplesDF,"age").show() +---+------+------+ |age| name| name| +---+------+------+ | 30| Andy| Andy| | 30| Andy| Andy| | 19|Justin|Justin| | 19|Justin|Justin| | 30| Andy| Andy| | 30| Andy| Andy| | 19|Justin|Justin| | 19|Justin|Justin| +---+------+------+ peoplesDF.join(peoplesDF.limit(2),"age").show() +---+----+----+ |age|name|name| +---+----+----+ | 30|Andy|Andy| | 30|Andy|Andy| +---+----+----+ Dataset df2 = peoplesDF.limit(2); peoplesDF.join(df2,peoplesDF.col("age").equalTo(df2.col("age")),"left_outer").show(); +----+-------+----+----+ | age| name| age|name| +----+-------+----+----+ |null|Michael|null|null| | 30| Andy| 30|Andy| | 19| Justin|null|null| |null|Michael|null|null| | 30| Andy| 30|Andy| | 19| Justin|null|null| +----+-------+----+----+
10、去除空值:na.drop()
//去除包含空值的列 peoplesDF.na().drop().show(); +---+------+ |age| name| +---+------+ | 30| Andy| | 19|Justin| | 30| Andy| | 19|Justin|
二 、Sql 操作数据
1、临时视图
//临时试图 peoplesDF.createOrReplaceTempView("peopleView"); spark.sql("SELECT * from peopleView t WHERE t.age>=19 ").show();
2、全局视图
//全局视图 peoplesDF.createGlobalTempView("peopleGlobalView"); spark.sql("SELECT * from global_temp.peopleGlobalView t WHERE t.age>=19 ").show();
3、临时表
//临时表 peoplesDF.registerTempTable("tables"); spark.sql("SELECT * from tables t WHERE t.age>=19 ").show(); +---+------+ |age| name| +---+------+ | 30| Andy| | 19|Justin| | 30| Andy| | 19|Justin| +---+------+
hive 数据操作
在以前的 API 中有 HiveContext 这个接口可以开启对 Hive 数据库的操作,现在统一的接口 Sparksession 同样能够开启对 Hive 的操作,
import java.io.File; import java.io.Serializable; import java.util.ArrayList; import java.util.List; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; public static class Record implements Serializable { private int key; private String value; public int getKey() { return key; } public void setKey(int key) { this.key = key; } public String getValue() { return value; } public void setValue(String value) { this.value = value; } } // warehouseLocation points to the default location for managed databases and tables String warehouseLocation = new File("spark-warehouse").getAbsolutePath(); SparkSession spark = SparkSession .builder() .appName("Java Spark Hive Example") .config("spark.sql.warehouse.dir", warehouseLocation) .enableHiveSupport() .getOrCreate(); spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive"); spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src"); // Queries are expressed in HiveQL spark.sql("SELECT * FROM src").show(); // +---+-------+ // |key| value| // +---+-------+ // |238|val_238| // | 86| val_86| // |311|val_311| // ... // Aggregation queries are also supported. spark.sql("SELECT COUNT(*) FROM src").show(); // +--------+ // |count(1)| // +--------+ // | 500 | // +--------+ // The results of SQL queries are themselves DataFrames and support all normal functions. Dataset<Row> sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key"); // The items in DataFrames are of type Row, which lets you to access each column by ordinal. Dataset<String> stringsDS = sqlDF.map( (MapFunction<Row, String>) row -> "Key: " + row.get(0) + ", Value: " + row.get(1), Encoders.STRING()); stringsDS.show(); // +--------------------+ // | value| // +--------------------+ // |Key: 0, Value: val_0| // |Key: 0, Value: val_0| // |Key: 0, Value: val_0| // ... // You can also use DataFrames to create temporary views within a SparkSession. List<Record> records = new ArrayList<>(); for (int key = 1; key < 100; key++) { Record record = new Record(); record.setKey(key); record.setValue("val_" + key); records.add(record); } Dataset<Row> recordsDF = spark.createDataFrame(records, Record.class); recordsDF.createOrReplaceTempView("records"); // Queries can then join DataFrames data with data stored in Hive. spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show(); // +---+------+---+------+ // |key| value|key| value| // +---+------+---+------+ // | 2| val_2| 2| val_2| // | 2| val_2| 2| val_2| // | 4| val_4| 4| val_4| // ...
hive 数据保存
- partitionBy(clomun) 分区表,按列分区
- bucketBy(number,clomun) 桶表
spark.sql("SELECT * from tables t WHERE t.age>=19 ") .write() .sortBy("age") .mode(SaveMode.Overwrite) .partitionBy("age") .bucketBy(10,"name") .saveAsTable("hivetable");
saveAsTable
,将数据保存到 Hive 表中,sparksession 中未配置 spar-warehouse,系统默认在本地创建,保存数据如下:创建了 spark-warehouse,按 ag 分区,默认使用了 snappy 压缩。存储为 parquet 格式。
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于