深度好文SparkSQL编程指南

北京知名皮肤病医院 http://m.39.net/pf/a_4655748.html

本文将讨论Spark的另外一个重要模块--SparkSQL,SparkSQL是在Shark的基础之上构建的,于年5月发布。从名称上可以看出,该模块是Spark提供的关系型操作API,实现了SQL-on-Spark的功能。对于一些熟悉SQL的用户,可以直接使用SQL在Spark上进行复杂的数据处理。通过本文,你可以了解到:

SparkSQL简介

DataFrameAPIDataSetAPI

CatalystOptimizer优化器

SparkSQL基本操作

SparkSQL的数据源

RDD与DataFrame相互转换

Thriftserver与SparkSQLCLI

SparkSQL简介

SparkSQL是Spark的其中一个模块,用于结构化数据处理。与基本的SparkRDDAPI不同,SparkSQL提供的接口为Spark提供了有关数据结构和正在执行的计算的更多信息,SparkSQL会使用这些额外的信息来执行额外的优化。使用SparkSQL的方式有很多种,包括SQL、DataFrameAPI以及DatasetAPI。值得注意的是,无论使用何种方式何种语言,其执行引擎都是相同的。实现这种统一,意味着开发人员可以轻松地在不同的API之间来回切换,从而使数据处理更加地灵活。

DataFrameAPI

DataFrame代表一个不可变的分布式数据集合,其核心目的是让开发者面对数据处理时,只关心要做什么,而不用关心怎么去做,将一些优化的工作交由Spark框架本身去处理。DataFrame是具有Schema信息的,也就是说可以被看做具有字段名称和类型的数据,类似于关系型数据库中的表,但是底层做了很多的优化。创建了DataFrame之后,就可以使用SQL进行数据处理。

用户可以从多种数据源中构造DataFrame,例如:结构化数据文件,Hive中的表,外部数据库或现有RDD。DataFrameAPI支持Scala,Java,Python和R,在Scala和Java中,row类型的DataSet代表DataFrame,即Dataset[Row]等同于DataFrame。

DataSetAPI

DataSet是Spark1.6中添加的新接口,是DataFrame的扩展,它具有RDD的优点(强类型输入,支持强大的lambda函数)以及SparkSQL的优化执行引擎的优点。可以通过JVM对象构建DataSet,然后使用函数转换(map,flatMapfilter)。值得注意的是,DatasetAPI在Scala和Java中可用,Python不支持DatasetAPI。

另外,DataSetAPI可以减少内存的使用,由于Spark框架知道DataSet的数据结构,因此在持久化DataSet时可以节省很多的内存空间。

在Catalyst中,存在两种类型的计划:

逻辑计划(LogicalPlan):定义数据集上的计算,尚未定义如何去执行计算。每个逻辑计划定义了一系列的用户代码所需要的属性(查询字段)和约束(where条件),但是不定义该如何执行。具体如下图所示:

物理计划(PhysicalPlan):物理计划是从逻辑计划生成的,定义了如何执行计算,是可执行的。举个栗子:逻辑计划中的JOIN会被转换为物理计划中的sortmergeJOIN。需要注意,Spark会生成多个物理计划,然后选择成本最低的物理计划。具体如下图所示:

在SparkSQL中,所有的算子操作会被转换成AST(abstractsyntaxtree,抽象语法树),然后将其传递给Catalyst优化器。该优化器是在Scala的函数式编程基础会上构建的,Catalyst支持基于规则的(rule-based)和基于成本的(cost-based)优化策略。

SparkSQL的查询计划包括4个阶段(见下图):

1.分析

2.逻辑优化

3.物理计划

4.生成代码,将查询部分编译成Java字节码

注意:在物理计划阶段,Catalyst会生成多个计划,并且会计算每个计划的成本,然后比较这些计划的成本的大小,即基于成本的策略。在其他阶段,都是基于规则的的优化策略。

分析

UnresolvedLogicalplan--Logicalplan。SparkSQL的查询计划首先起始于由SQL解析器返回的AST,或者是由API构建的DataFrame对象。在这两种情况下,都会存在未处理的属性引用(某个查询字段可能不存在,或者数据类型错误),比如查询语句:SELECTcolFROMsales,关于字段col的类型,或者该字段是否是一个有效的字段,只有等到查看该sales表时才会清楚。当不能确定一个属性字段的类型或者没能够与输入表进行匹配时,称之为未处理的。SparkSQL使用Catalyst的规则以及Catalog对象(能够访问数据源的表信息)来处理这些属性。首先会构建一个UnresolvedLogicalPlan树,然后作用一系列的规则,最后生成LogicalPlan。

逻辑优化

Logicalplan--OptimizedLogicalPlan。逻辑优化阶段使用基于规则的优化策略,比如谓词下推、投影裁剪等。经过一些列优化过后,生成优化的逻辑计划OptimizedLogicalPlan。

物理计划

OptimizedLogicalPlan--physicalPlan。在物理计划阶段,SparkSQL会将优化的逻辑计划生成多个物理执行计划,然后使用CostModel计算每个物理计划的成本,最终选择一个物理计划。在这个阶段,如果确定一张表很小(可以持久化到内存),SparkSQL会使用broadcastjoin。

需要注意的是,物理计划器也会使用基于规则的优化策略,比如将投影、过滤操作管道化一个Spark的map算子。此外,还会将逻辑计划阶段的操作推到数据源端(支持谓词下推、投影下推)。

代码生成

查询优化的最终阶段是生成Java字节码,使用Quasiquotes来完成这项工作的。

经过上面的分析,对CatalystOptimizer有了初步的了解。关于Spark的其他组件是如何与CatalystOptimizer交互的呢?具体如下图所示:

如上图所示:MLPipelines,Structuredstreaming以及GraphFrames都使用了DataFrame/DatasetAPIs,并且都得益于Catalystoptimiser。

QuickStart

创建SparkSession

SparkSession是Dataset与DataFrameAPI的编程入口,从Spark2.0开始支持。用于统一原来的HiveContext和SQLContext,为了兼容两者,仍然保留这两个入口。通过一个SparkSession入口,提高了Spark的易用性。

创建DataFrame

创建完SparkSession之后,可以使用SparkSession从已经存在的RDD、Hive表或者其他数据源中创建DataFrame。

DataFrame基本操作

创建完DataFrame之后,可以对其进行一些列的操作。

在程序中使用SQL查询

上面的操作使用的是DSL(domain-specificlanguage)方式,还可以直接使用SQL对DataFrame进行操作。

GlobalTemporaryView

上面使用的是Temporaryviews的方式,该方式是SparkSession范围的。如果将创建的view可以在所有session之间共享,可以使用GlobalTemporaryView的方式创建view。

创建DataSet

DataSet与RDD很类似,但是,RDD使用的Java的序列化器或者Kyro序列化,而DataSet使用的是Encoder对在网络间传输的对象进行序列化的。

SparkSQL支持两种不同的方式将RDD转换为DataFrame。第一种是使用反射来推断包含特定类型对象的RDD的模式,这种基于反射的方式可以提供更简洁的代码,如果在编写Spark应用程序时,已经明确了schema,可以使用这种方式。第二种方式是通过可编程接口来构建schema,然后将其应用于现有的RDD。此方式编写的代码更冗长,此种方式创建的DataFrame,直到运行时才知道该DataFrame的列及其类型。

通过反射的方式

SparkSQL的Scala接口支持自动将包含样例类的RDD转换为DataFrame。样例类定义表的schema。通过反射读取样例类的参数名称,并映射成column的名称。

保存模式

保存为持久化表

DataFrame可以被保存为Hive的持久化表,值得注意的是,这种方式并不依赖与Hive的部署,也就是说Spark会使用Derby创建一个默认的本地Hivemetastore,与createOrReplaceTempView不同,该方式会直接将结果物化。

对于基于文件的数据源(text,parquet,json等),在保存的时候可以指定一个具体的路径,比如df.write.option("path","/some/path").saveAsTable("t")(存储在指定路径下的文件格式为parquet)。当表被删除时,自定义的表的路径和表数据不会被移除。如果没有指定具体的路径,spark默认的是warehouse的目录(/user/hive/warehouse),当表被删除时,默认的表路径也会被删除。

JDBC数据源

SparkSQL还包括一个可以使用JDBC从其他数据库读取数据的数据源。与使用JdbcRDD相比,应优先使用此功能。这是因为结果作为DataFrame返回,它们可以在SparkSQL中轻松处理或与其他数据源连接。JDBC数据源也更易于使用Java或Python,因为它不需要用户提供ClassTag。

可以使用DataSourcesAPI将远程数据库中的表加载为DataFrame或SparkSQL临时视图。用户可以在数据源选项中指定JDBC连接属性。user并且password通常作为用于登录数据源的连接属性提供。除连接属性外,Spark还支持以下不区分大小写的选项。

SparkSQL集成Hive

SparkSQL还支持读取和写入存储在ApacheHive中的数据。但是,由于Hive具有大量依赖项,因此这些依赖项不包含在默认的Spark发布包中。如果可以在类路径上找到Hive依赖项,Spark将自动加载它们。请注意,这些Hive依赖项也必须存在于所有工作节点(workernodes)上,因为它们需要访问Hive序列化和反序列化库(SerDes)才能访问存储在Hive中的数据。

将hive-site.xml,core-site.xml以及hdfs-site.xml文件放在conf/下。

在使用Hive时,必须实例化一个支持Hive的SparkSession,包括连接到持久性HiveMetastore,支持Hive的序列化、反序列化(serdes)和Hive用户定义函数。没有部署Hive的用户仍可以启用Hive支持。如果未配置hive-site.xml,则上下文(context)会在当前目录中自动创建metastore_db,并且会创建一个由spark.sql.warehouse.dir配置的目录,其默认目录为spark-warehouse,位于启动Spark应用程序的当前目录中。请注意,自Spark2.0.0以来,该在hive-site.xml中的hive.metastore.warehouse.dir属性已被标记过时(deprecated)。使用spark.sql.warehouse.dir用于指定warehouse中的默认位置。可能需要向启动Spark应用程序的用户授予写入的权限。

下面的案例为在本地运行(为了方便查看打印的结果),运行结束之后会发现在项目的目录下E:\IdeaProjects\myspark创建了spark-warehouse和metastore_db的文件夹。可以看出没有部署Hive的用户仍可以启用Hive支持,同时也可以将代码打包,放在集群上运行。

Thriftserver与SparkSQLCLI

可以使用JDBC/ODBC或者命令行访问SparkSQL,通过这种方式,用户可以直接使用SQL运行查询,而不用编写代码。

ThriftJDBC/ODBCserver

ThriftJDBC/ODBCserver与Hive的HiveServer2向对应,可以使用Beeline访问JDBC服务器。在Spark的sbin目录下存在start-thriftserver.sh脚本,使用此脚本启动JDBC/ODBC服务器:

./sbin/start-thriftserver.sh

使用beeline访问JDBC/ODBC服务器,Beeline会要求提供用户名和密码,在非安全模式下,只需输入用户名和空白密码即可

beeline!connectjdbc:hive2://localhost:

image

SparkSQLCLI

SparkSQLCLI是在本地模式下运行HiveMetastore服务并执行从命令行输入的查询的便捷工具。请注意,SparkSQLCLI无法与ThriftJDBC服务器通信。

要启动SparkSQLCLI,只需要在Spark的bin目录中运行以下命令:

./spark-sql

image

总结

本文主要对SparkSQL进行了阐述,主要包括SparkSQL的介绍、DataFrameDataSetAPI基本使用、CatalystOptimizer优化器的基本原理、SparkSQL编程、SparkSQL数据源以及与Hive集成、Thriftserver与SparkSQLCLI。




转载请注明:http://www.aierlanlan.com/rzdk/2307.html