剖析 spark-shell

本贴最后更新于 2349 天前,其中的信息可能已经渤澥桑田

我们首先来看看 spark-shell 到底做了什么,spark-shell 中有一段脚本内容如下:

function main() { if $cygwin; then # Workaround for issue involving JLine and Cygwin # (see http://sourceforge.net/p/jline/bugs/40/). # If you're using the Mintty terminal emulator in Cygwin, may need to set the # "Backspace sends ^H" setting in "Keys" section of the Mintty options # (see https://github.com/sbt/sbt/issues/562). stty -icanon min 1 -echo > /dev/null 2>&1 export SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS -Djline.terminal=unix" "${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.repl.Main --name "Spark shell" "$@" stty icanon echo > /dev/null 2>&1 else export SPARK_SUBMIT_OPTS "${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.repl.Main --name "Spark shell" "$@" fi }

在上面的脚本中,实际上市执行了 spark-submit,查看 spark-submit 代码:

if [ -z "${SPARK_HOME}" ]; then export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)" fi # disable randomized hash for string in Python 3.3+ export PYTHONHASHSEED=0 exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"

非常简单,执行 spark-class 并传入参数.继续查看 spark-class 脚本内容:

if [ -z "${SPARK_HOME}" ]; then export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)" fi . "${SPARK_HOME}"/bin/load-spark-env.sh

执行 load-spark-env.sh 加载环境变量,稍后在讨论这个脚本

...... build_command() { "$RUNNER" -Xmx128m -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@" printf "%d\0" $? } ...... CMD=() while IFS= read -d '' -r ARG; do CMD+=("$ARG") done < <(build_command "$@") ...... CMD=("${CMD[@]:0:$LAST}") exec "${CMD[@]}"

c 从上面可以看出执行了 org.apache.spark.launcher.Main ,继续打开 org.apache.spark.launcher.Main 查看代码

public static void main(String[] argsArray) throws Exception { checkArgument(argsArray.length > 0, "Not enough arguments: missing class name."); List<String> args = new ArrayList<>(Arrays.asList(argsArray)); String className = args.remove(0); boolean printLaunchCommand = !isEmpty(System.getenv("SPARK_PRINT_LAUNCH_COMMAND")); AbstractCommandBuilder builder; if (className.equals("org.apache.spark.deploy.SparkSubmit")) { try { builder = new SparkSubmitCommandBuilder(args); } catch (IllegalArgumentException e) { printLaunchCommand = false; System.err.println("Error: " + e.getMessage()); System.err.println(); MainClassOptionParser parser = new MainClassOptionParser(); try { parser.parse(args); } catch (Exception ignored) { // Ignore parsing exceptions. } List<String> help = new ArrayList<>(); if (parser.className != null) { help.add(parser.CLASS); help.add(parser.className); } help.add(parser.USAGE_ERROR); builder = new SparkSubmitCommandBuilder(help); } } else { builder = new SparkClassCommandBuilder(className, args); } Map<String, String> env = new HashMap<>(); List<String> cmd = builder.buildCommand(env); if (printLaunchCommand) { System.err.println("Spark Command: " + join(" ", cmd)); System.err.println("========================================"); } if (isWindows()) { System.out.println(prepareWindowsCommand(cmd, env)); } else { // In bash, use NULL as the arg separator since it cannot be used in an argument. List<String> bashCmd = prepareBashCommand(cmd, env); for (String c : bashCmd) { System.out.print(c); System.out.print('\0'); } } }

从上面的分析我们可知,spark-submit 传递给 spark-class 的参数为 org.apache.spark.deploy.SparkSubmit,所以在 org.apache.spark.launcher.Main 执行的应该是

builder = new SparkSubmitCommandBuilder(args);

设置一些参数信息

继续往下执行 builder.buildCommand(env); 查看 buildCommand 内容

@Override public List<String> buildCommand(Map<String, String> env) throws IOException, IllegalArgumentException { if (PYSPARK_SHELL.equals(appResource) && isAppResourceReq) { return buildPySparkShellCommand(env); } else if (SPARKR_SHELL.equals(appResource) && isAppResourceReq) { return buildSparkRCommand(env); } else { return buildSparkSubmitCommand(env); } }

判断启动时的哪种环境 py,shell,or submit 然后构建命令,继续查看 buildSparkSubmitCommand 函数

其中有如下代码:

... addPermGenSizeOpt(cmd); cmd.add("org.apache.spark.deploy.SparkSubmit"); cmd.addAll(buildSparkSubmitArgs()); return cmd;

好了继续查看 org.apache.spark.deploy.SparkSubmit

spark main 线程 dump 信息

"main" #1 prio=5 os_prio=31 tid=0x00007f807180c800 nid=0x1c03 runnable [0x0000700003b08000] java.lang.Thread.State: RUNNABLE at java.io.FileInputStream.read0(Native Method) at java.io.FileInputStream.read(FileInputStream.java:207) at jline.internal.NonBlockingInputStream.read(NonBlockingInputStream.java:169) - locked <0x00000007830bf508> (a jline.internal.NonBlockingInputStream) at jline.internal.NonBlockingInputStream.read(NonBlockingInputStream.java:137) at jline.internal.NonBlockingInputStream.read(NonBlockingInputStream.java:246) at jline.internal.InputStreamReader.read(InputStreamReader.java:261) - locked <0x00000007830bf508> (a jline.internal.NonBlockingInputStream) at jline.internal.InputStreamReader.read(InputStreamReader.java:198) - locked <0x00000007830bf508> (a jline.internal.NonBlockingInputStream) at jline.console.ConsoleReader.readCharacter(ConsoleReader.java:2145) at jline.console.ConsoleReader.readLine(ConsoleReader.java:2349) at jline.console.ConsoleReader.readLine(ConsoleReader.java:2269) at scala.tools.nsc.interpreter.jline.InteractiveReader.readOneLine(JLineReader.scala:57) at scala.tools.nsc.interpreter.InteractiveReader$$anonfun$readLine$2.apply(InteractiveReader.scala:37) at scala.tools.nsc.interpreter.InteractiveReader$$anonfun$readLine$2.apply(InteractiveReader.scala:37) at scala.tools.nsc.interpreter.InteractiveReader$.restartSysCalls(InteractiveReader.scala:44) at scala.tools.nsc.interpreter.InteractiveReader$class.readLine(InteractiveReader.scala:37) at scala.tools.nsc.interpreter.jline.InteractiveReader.readLine(JLineReader.scala:28) at scala.tools.nsc.interpreter.ILoop.readOneLine(ILoop.scala:404) at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:413) at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:923) at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909) at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909) at scala.reflect.internal.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:97) at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:909) at org.apache.spark.repl.Main$.doMain(Main.scala:68) at org.apache.spark.repl.Main$.main(Main.scala:51) at org.apache.spark.repl.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:736) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Locked ownable synchronizers: - None

从堆栈中信息中我们可以看出程序的调用顺序:SparkSubmit.main => repl.Main.main => ILoop.process
ILoop.process 中如下代码:

def process(settings: Settings): Boolean = savingContextLoader { this.settings = settings createInterpreter() // sets in to some kind of reader depending on environmental cues in = in0.fold(chooseReader(settings))(r => SimpleReader(r, out, interactive = true)) globalFuture = future { intp.initializeSynchronous() loopPostInit() !intp.reporter.hasErrors } loadFiles(settings) printWelcome() try loop() match { case LineResults.EOF => out print Properties.shellInterruptedString case _ => } catch AbstractOrMissingHandler() finally closeInterpreter() true }

在 process 中我们发现调用了 loadFiles 并且打印 Welcome 信息

SparkLoop 继承了 loadFiles 并且复写了 loadFiles 方法 如下:

override def loadFiles(settings: Settings): Unit = { initializeSpark() super.loadFiles(settings) }

在 loadFiles 中调度 initalizeSpark ,查看源码如下:

def initializeSpark() { intp.beQuietDuring { processLine(""" @transient val spark = if (org.apache.spark.repl.Main.sparkSession != null) { org.apache.spark.repl.Main.sparkSession } else { org.apache.spark.repl.Main.createSparkSession() } @transient val sc = { val _sc = spark.sparkContext _sc.uiWebUrl.foreach(webUrl => println(s"Spark context Web UI available at ${webUrl}")) println("Spark context available as 'sc' " + s"(master = ${_sc.master}, app id = ${_sc.applicationId}).") println("Spark session available as 'spark'.") _sc } """) processLine("import org.apache.spark.SparkContext._") processLine("import spark.implicits._") processLine("import spark.sql") processLine("import org.apache.spark.sql.functions._") replayCommandStack = Nil // remove above commands from session history. } }

从上面可以看出,如果 SparkSession 已存在,那么直接返回,否则调用 createSparkSession

最后从 SparkSession 中返回 SparkContext 查看 createSparkSession 源码

def createSparkSession(): SparkSession = { val execUri = System.getenv("SPARK_EXECUTOR_URI") conf.setIfMissing("spark.app.name", "Spark shell") // SparkContext will detect this configuration and register it with the RpcEnv's // file server, setting spark.repl.class.uri to the actual URI for executors to // use. This is sort of ugly but since executors are started as part of SparkContext // initialization in certain cases, there's an initialization order issue that prevents // this from being set after SparkContext is instantiated. conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath()) if (execUri != null) { conf.set("spark.executor.uri", execUri) } if (System.getenv("SPARK_HOME") != null) { conf.setSparkHome(System.getenv("SPARK_HOME")) } val builder = SparkSession.builder.config(conf) if (conf.get(CATALOG_IMPLEMENTATION.key, "hive").toLowerCase == "hive") { if (SparkSession.hiveClassesArePresent) { // In the case that the property is not set at all, builder's config // does not have this value set to 'hive' yet. The original default // behavior is that when there are hive classes, we use hive catalog. sparkSession = builder.enableHiveSupport().getOrCreate() logInfo("Created Spark session with Hive support") } else { // Need to change it back to 'in-memory' if no hive classes are found // in the case that the property is set to hive in spark-defaults.conf builder.config(CATALOG_IMPLEMENTATION.key, "in-memory") sparkSession = builder.getOrCreate() logInfo("Created Spark session") } } else { // In the case that the property is set but not to 'hive', the internal // default is 'in-memory'. So the sparkSession will use in-memory catalog. sparkSession = builder.getOrCreate() logInfo("Created Spark session") } sparkContext = sparkSession.sparkContext Signaling.cancelOnInterrupt(sparkContext) sparkSession }

这里最后使用 SparkConf 设置一些必要的参数并且通过 Builder 创建 sparkSession 并且判断是否需要启用 Hive 的支持。

  • Spark

    Spark 是 UC Berkeley AMP lab 所开源的类 Hadoop MapReduce 的通用并行框架。Spark 拥有 Hadoop MapReduce 所具有的优点;但不同于 MapReduce 的是 Job 中间输出结果可以保存在内存中,从而不再需要读写 HDFS,因此 Spark 能更好地适用于数据挖掘与机器学习等需要迭代的 MapReduce 的算法。

    74 引用 • 46 回帖 • 567 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...

推荐标签 标签

  • Gitea

    Gitea 是一个开源社区驱动的轻量级代码托管解决方案,后端采用 Go 编写,采用 MIT 许可证。

    5 引用 • 16 回帖 • 1 关注
  • Wide

    Wide 是一款基于 Web 的 Go 语言 IDE。通过浏览器就可以进行 Go 开发,并有代码自动完成、查看表达式、编译反馈、Lint、实时结果输出等功能。

    欢迎访问我们运维的实例: https://wide.b3log.org

    30 引用 • 218 回帖 • 644 关注
  • 新人

    让我们欢迎这对新人。哦,不好意思说错了,让我们欢迎这位新人!
    新手上路,请谨慎驾驶!

    52 引用 • 228 回帖
  • jQuery

    jQuery 是一套跨浏览器的 JavaScript 库,强化 HTML 与 JavaScript 之间的操作。由 John Resig 在 2006 年 1 月的 BarCamp NYC 上释出第一个版本。全球约有 28% 的网站使用 jQuery,是非常受欢迎的 JavaScript 库。

    63 引用 • 134 回帖 • 735 关注
  • CloudFoundry

    Cloud Foundry 是 VMware 推出的业界第一个开源 PaaS 云平台,它支持多种框架、语言、运行时环境、云平台及应用服务,使开发人员能够在几秒钟内进行应用程序的部署和扩展,无需担心任何基础架构的问题。

    5 引用 • 18 回帖 • 192 关注
  • JetBrains

    JetBrains 是一家捷克的软件开发公司,该公司位于捷克的布拉格,并在俄国的圣彼得堡及美国麻州波士顿都设有办公室,该公司最为人所熟知的产品是 Java 编程语言开发撰写时所用的集成开发环境:IntelliJ IDEA

    18 引用 • 54 回帖
  • 职场

    找到自己的位置,萌新烦恼少。

    127 引用 • 1708 回帖
  • danl
    173 关注
  • Caddy

    Caddy 是一款默认自动启用 HTTPS 的 HTTP/2 Web 服务器。

    10 引用 • 54 回帖 • 179 关注
  • Hadoop

    Hadoop 是由 Apache 基金会所开发的一个分布式系统基础架构。用户可以在不了解分布式底层细节的情况下,开发分布式程序。充分利用集群的威力进行高速运算和存储。

    93 引用 • 122 回帖 • 619 关注
  • Webswing

    Webswing 是一个能将任何 Swing 应用通过纯 HTML5 运行在浏览器中的 Web 服务器,详细介绍请看 将 Java Swing 应用变成 Web 应用

    1 引用 • 15 回帖 • 645 关注
  • 以太坊

    以太坊(Ethereum)并不是一个机构,而是一款能够在区块链上实现智能合约、开源的底层系统。以太坊是一个平台和一种编程语言 Solidity,使开发人员能够建立和发布下一代去中心化应用。 以太坊可以用来编程、分散、担保和交易任何事物:投票、域名、金融交易所、众筹、公司管理、合同和知识产权等等。

    34 引用 • 367 回帖
  • Eclipse

    Eclipse 是一个开放源代码的、基于 Java 的可扩展开发平台。就其本身而言,它只是一个框架和一组服务,用于通过插件组件构建开发环境。

    76 引用 • 258 回帖 • 628 关注
  • 游戏

    沉迷游戏伤身,强撸灰飞烟灭。

    187 引用 • 830 回帖
  • 数据库

    据说 99% 的性能瓶颈都在数据库。

    345 引用 • 754 回帖
  • 一些有用的避坑指南。

    69 引用 • 93 回帖
  • GAE

    Google App Engine(GAE)是 Google 管理的数据中心中用于 WEB 应用程序的开发和托管的平台。2008 年 4 月 发布第一个测试版本。目前支持 Python、Java 和 Go 开发部署。全球已有数十万的开发者在其上开发了众多的应用。

    14 引用 • 42 回帖 • 820 关注
  • 机器学习

    机器学习(Machine Learning)是一门多领域交叉学科,涉及概率论、统计学、逼近论、凸分析、算法复杂度理论等多门学科。专门研究计算机怎样模拟或实现人类的学习行为,以获取新的知识或技能,重新组织已有的知识结构使之不断改善自身的性能。

    77 引用 • 37 回帖
  • Typecho

    Typecho 是一款博客程序,它在 GPLv2 许可证下发行,基于 PHP 构建,可以运行在各种平台上,支持多种数据库(MySQL、PostgreSQL、SQLite)。

    12 引用 • 67 回帖 • 445 关注
  • 学习

    “梦想从学习开始,事业从实践起步” —— 习近平

    172 引用 • 534 回帖
  • WiFiDog

    WiFiDog 是一套开源的无线热点认证管理工具,主要功能包括:位置相关的内容递送;用户认证和授权;集中式网络监控。

    1 引用 • 7 回帖 • 615 关注
  • 博客

    记录并分享人生的经历。

    273 引用 • 2388 回帖 • 3 关注
  • 前端

    前端技术一般分为前端设计和前端开发,前端设计可以理解为网站的视觉设计,前端开发则是网站的前台代码实现,包括 HTML、CSS 以及 JavaScript 等。

    246 引用 • 1338 回帖
  • Flume

    Flume 是一套分布式的、可靠的,可用于有效地收集、聚合和搬运大量日志数据的服务架构。

    9 引用 • 6 回帖 • 661 关注
  • Python

    Python 是一种面向对象、直译式电脑编程语言,具有近二十年的发展历史,成熟且稳定。它包含了一组完善而且容易理解的标准库,能够轻松完成很多常见的任务。它的语法简捷和清晰,尽量使用无异义的英语单词,与其它大多数程序设计语言使用大括号不一样,它使用缩进来定义语句块。

    554 引用 • 675 回帖
  • 浅吟主题

    Jeffrey Chen 制作的思源笔记主题,项目仓库:https://github.com/TCOTC/Whisper

    1 引用 • 28 回帖 • 3 关注
  • Q&A

    提问之前请先看《提问的智慧》,好的问题比好的答案更有价值。

    10003 引用 • 45474 回帖 • 74 关注