剖析 spark-shell

本贴最后更新于 2307 天前,其中的信息可能已经渤澥桑田

我们首先来看看 spark-shell 到底做了什么,spark-shell 中有一段脚本内容如下:

function main() { if $cygwin; then # Workaround for issue involving JLine and Cygwin # (see http://sourceforge.net/p/jline/bugs/40/). # If you're using the Mintty terminal emulator in Cygwin, may need to set the # "Backspace sends ^H" setting in "Keys" section of the Mintty options # (see https://github.com/sbt/sbt/issues/562). stty -icanon min 1 -echo > /dev/null 2>&1 export SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS -Djline.terminal=unix" "${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.repl.Main --name "Spark shell" "$@" stty icanon echo > /dev/null 2>&1 else export SPARK_SUBMIT_OPTS "${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.repl.Main --name "Spark shell" "$@" fi }

在上面的脚本中,实际上市执行了 spark-submit,查看 spark-submit 代码:

if [ -z "${SPARK_HOME}" ]; then export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)" fi # disable randomized hash for string in Python 3.3+ export PYTHONHASHSEED=0 exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"

非常简单,执行 spark-class 并传入参数.继续查看 spark-class 脚本内容:

if [ -z "${SPARK_HOME}" ]; then export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)" fi . "${SPARK_HOME}"/bin/load-spark-env.sh

执行 load-spark-env.sh 加载环境变量,稍后在讨论这个脚本

...... build_command() { "$RUNNER" -Xmx128m -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@" printf "%d\0" $? } ...... CMD=() while IFS= read -d '' -r ARG; do CMD+=("$ARG") done < <(build_command "$@") ...... CMD=("${CMD[@]:0:$LAST}") exec "${CMD[@]}"

c 从上面可以看出执行了 org.apache.spark.launcher.Main ,继续打开 org.apache.spark.launcher.Main 查看代码

public static void main(String[] argsArray) throws Exception { checkArgument(argsArray.length > 0, "Not enough arguments: missing class name."); List<String> args = new ArrayList<>(Arrays.asList(argsArray)); String className = args.remove(0); boolean printLaunchCommand = !isEmpty(System.getenv("SPARK_PRINT_LAUNCH_COMMAND")); AbstractCommandBuilder builder; if (className.equals("org.apache.spark.deploy.SparkSubmit")) { try { builder = new SparkSubmitCommandBuilder(args); } catch (IllegalArgumentException e) { printLaunchCommand = false; System.err.println("Error: " + e.getMessage()); System.err.println(); MainClassOptionParser parser = new MainClassOptionParser(); try { parser.parse(args); } catch (Exception ignored) { // Ignore parsing exceptions. } List<String> help = new ArrayList<>(); if (parser.className != null) { help.add(parser.CLASS); help.add(parser.className); } help.add(parser.USAGE_ERROR); builder = new SparkSubmitCommandBuilder(help); } } else { builder = new SparkClassCommandBuilder(className, args); } Map<String, String> env = new HashMap<>(); List<String> cmd = builder.buildCommand(env); if (printLaunchCommand) { System.err.println("Spark Command: " + join(" ", cmd)); System.err.println("========================================"); } if (isWindows()) { System.out.println(prepareWindowsCommand(cmd, env)); } else { // In bash, use NULL as the arg separator since it cannot be used in an argument. List<String> bashCmd = prepareBashCommand(cmd, env); for (String c : bashCmd) { System.out.print(c); System.out.print('\0'); } } }

从上面的分析我们可知,spark-submit 传递给 spark-class 的参数为 org.apache.spark.deploy.SparkSubmit,所以在 org.apache.spark.launcher.Main 执行的应该是

builder = new SparkSubmitCommandBuilder(args);

设置一些参数信息

继续往下执行 builder.buildCommand(env); 查看 buildCommand 内容

@Override public List<String> buildCommand(Map<String, String> env) throws IOException, IllegalArgumentException { if (PYSPARK_SHELL.equals(appResource) && isAppResourceReq) { return buildPySparkShellCommand(env); } else if (SPARKR_SHELL.equals(appResource) && isAppResourceReq) { return buildSparkRCommand(env); } else { return buildSparkSubmitCommand(env); } }

判断启动时的哪种环境 py,shell,or submit 然后构建命令,继续查看 buildSparkSubmitCommand 函数

其中有如下代码:

... addPermGenSizeOpt(cmd); cmd.add("org.apache.spark.deploy.SparkSubmit"); cmd.addAll(buildSparkSubmitArgs()); return cmd;

好了继续查看 org.apache.spark.deploy.SparkSubmit

spark main 线程 dump 信息

"main" #1 prio=5 os_prio=31 tid=0x00007f807180c800 nid=0x1c03 runnable [0x0000700003b08000] java.lang.Thread.State: RUNNABLE at java.io.FileInputStream.read0(Native Method) at java.io.FileInputStream.read(FileInputStream.java:207) at jline.internal.NonBlockingInputStream.read(NonBlockingInputStream.java:169) - locked <0x00000007830bf508> (a jline.internal.NonBlockingInputStream) at jline.internal.NonBlockingInputStream.read(NonBlockingInputStream.java:137) at jline.internal.NonBlockingInputStream.read(NonBlockingInputStream.java:246) at jline.internal.InputStreamReader.read(InputStreamReader.java:261) - locked <0x00000007830bf508> (a jline.internal.NonBlockingInputStream) at jline.internal.InputStreamReader.read(InputStreamReader.java:198) - locked <0x00000007830bf508> (a jline.internal.NonBlockingInputStream) at jline.console.ConsoleReader.readCharacter(ConsoleReader.java:2145) at jline.console.ConsoleReader.readLine(ConsoleReader.java:2349) at jline.console.ConsoleReader.readLine(ConsoleReader.java:2269) at scala.tools.nsc.interpreter.jline.InteractiveReader.readOneLine(JLineReader.scala:57) at scala.tools.nsc.interpreter.InteractiveReader$$anonfun$readLine$2.apply(InteractiveReader.scala:37) at scala.tools.nsc.interpreter.InteractiveReader$$anonfun$readLine$2.apply(InteractiveReader.scala:37) at scala.tools.nsc.interpreter.InteractiveReader$.restartSysCalls(InteractiveReader.scala:44) at scala.tools.nsc.interpreter.InteractiveReader$class.readLine(InteractiveReader.scala:37) at scala.tools.nsc.interpreter.jline.InteractiveReader.readLine(JLineReader.scala:28) at scala.tools.nsc.interpreter.ILoop.readOneLine(ILoop.scala:404) at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:413) at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:923) at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909) at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909) at scala.reflect.internal.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:97) at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:909) at org.apache.spark.repl.Main$.doMain(Main.scala:68) at org.apache.spark.repl.Main$.main(Main.scala:51) at org.apache.spark.repl.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:736) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Locked ownable synchronizers: - None

从堆栈中信息中我们可以看出程序的调用顺序:SparkSubmit.main => repl.Main.main => ILoop.process
ILoop.process 中如下代码:

def process(settings: Settings): Boolean = savingContextLoader { this.settings = settings createInterpreter() // sets in to some kind of reader depending on environmental cues in = in0.fold(chooseReader(settings))(r => SimpleReader(r, out, interactive = true)) globalFuture = future { intp.initializeSynchronous() loopPostInit() !intp.reporter.hasErrors } loadFiles(settings) printWelcome() try loop() match { case LineResults.EOF => out print Properties.shellInterruptedString case _ => } catch AbstractOrMissingHandler() finally closeInterpreter() true }

在 process 中我们发现调用了 loadFiles 并且打印 Welcome 信息

SparkLoop 继承了 loadFiles 并且复写了 loadFiles 方法 如下:

override def loadFiles(settings: Settings): Unit = { initializeSpark() super.loadFiles(settings) }

在 loadFiles 中调度 initalizeSpark ,查看源码如下:

def initializeSpark() { intp.beQuietDuring { processLine(""" @transient val spark = if (org.apache.spark.repl.Main.sparkSession != null) { org.apache.spark.repl.Main.sparkSession } else { org.apache.spark.repl.Main.createSparkSession() } @transient val sc = { val _sc = spark.sparkContext _sc.uiWebUrl.foreach(webUrl => println(s"Spark context Web UI available at ${webUrl}")) println("Spark context available as 'sc' " + s"(master = ${_sc.master}, app id = ${_sc.applicationId}).") println("Spark session available as 'spark'.") _sc } """) processLine("import org.apache.spark.SparkContext._") processLine("import spark.implicits._") processLine("import spark.sql") processLine("import org.apache.spark.sql.functions._") replayCommandStack = Nil // remove above commands from session history. } }

从上面可以看出,如果 SparkSession 已存在,那么直接返回,否则调用 createSparkSession

最后从 SparkSession 中返回 SparkContext 查看 createSparkSession 源码

def createSparkSession(): SparkSession = { val execUri = System.getenv("SPARK_EXECUTOR_URI") conf.setIfMissing("spark.app.name", "Spark shell") // SparkContext will detect this configuration and register it with the RpcEnv's // file server, setting spark.repl.class.uri to the actual URI for executors to // use. This is sort of ugly but since executors are started as part of SparkContext // initialization in certain cases, there's an initialization order issue that prevents // this from being set after SparkContext is instantiated. conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath()) if (execUri != null) { conf.set("spark.executor.uri", execUri) } if (System.getenv("SPARK_HOME") != null) { conf.setSparkHome(System.getenv("SPARK_HOME")) } val builder = SparkSession.builder.config(conf) if (conf.get(CATALOG_IMPLEMENTATION.key, "hive").toLowerCase == "hive") { if (SparkSession.hiveClassesArePresent) { // In the case that the property is not set at all, builder's config // does not have this value set to 'hive' yet. The original default // behavior is that when there are hive classes, we use hive catalog. sparkSession = builder.enableHiveSupport().getOrCreate() logInfo("Created Spark session with Hive support") } else { // Need to change it back to 'in-memory' if no hive classes are found // in the case that the property is set to hive in spark-defaults.conf builder.config(CATALOG_IMPLEMENTATION.key, "in-memory") sparkSession = builder.getOrCreate() logInfo("Created Spark session") } } else { // In the case that the property is set but not to 'hive', the internal // default is 'in-memory'. So the sparkSession will use in-memory catalog. sparkSession = builder.getOrCreate() logInfo("Created Spark session") } sparkContext = sparkSession.sparkContext Signaling.cancelOnInterrupt(sparkContext) sparkSession }

这里最后使用 SparkConf 设置一些必要的参数并且通过 Builder 创建 sparkSession 并且判断是否需要启用 Hive 的支持。

  • Spark

    Spark 是 UC Berkeley AMP lab 所开源的类 Hadoop MapReduce 的通用并行框架。Spark 拥有 Hadoop MapReduce 所具有的优点;但不同于 MapReduce 的是 Job 中间输出结果可以保存在内存中,从而不再需要读写 HDFS,因此 Spark 能更好地适用于数据挖掘与机器学习等需要迭代的 MapReduce 的算法。

    74 引用 • 46 回帖 • 563 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...

推荐标签 标签

  • Access
    1 引用 • 3 回帖 • 2 关注
  • RIP

    愿逝者安息!

    8 引用 • 92 回帖 • 400 关注
  • SendCloud

    SendCloud 由搜狐武汉研发中心孵化的项目,是致力于为开发者提供高质量的触发邮件服务的云端邮件发送平台,为开发者提供便利的 API 接口来调用服务,让邮件准确迅速到达用户收件箱并获得强大的追踪数据。

    2 引用 • 8 回帖 • 499 关注
  • Kotlin

    Kotlin 是一种在 Java 虚拟机上运行的静态类型编程语言,由 JetBrains 设计开发并开源。Kotlin 可以编译成 Java 字节码,也可以编译成 JavaScript,方便在没有 JVM 的设备上运行。在 Google I/O 2017 中,Google 宣布 Kotlin 成为 Android 官方开发语言。

    19 引用 • 33 回帖 • 74 关注
  • App

    App(应用程序,Application 的缩写)一般指手机软件。

    91 引用 • 384 回帖
  • SMTP

    SMTP(Simple Mail Transfer Protocol)即简单邮件传输协议,它是一组用于由源地址到目的地址传送邮件的规则,由它来控制信件的中转方式。SMTP 协议属于 TCP/IP 协议簇,它帮助每台计算机在发送或中转信件时找到下一个目的地。

    4 引用 • 18 回帖 • 634 关注
  • 自由行
    2 关注
  • SVN

    SVN 是 Subversion 的简称,是一个开放源代码的版本控制系统,相较于 RCS、CVS,它采用了分支管理系统,它的设计目标就是取代 CVS。

    29 引用 • 98 回帖 • 688 关注
  • Sym

    Sym 是一款用 Java 实现的现代化社区(论坛/BBS/社交网络/博客)系统平台。

    下一代的社区系统,为未来而构建

    524 引用 • 4601 回帖 • 706 关注
  • 星云链

    星云链是一个开源公链,业内简单的将其称为区块链上的谷歌。其实它不仅仅是区块链搜索引擎,一个公链的所有功能,它基本都有,比如你可以用它来开发部署你的去中心化的 APP,你可以在上面编写智能合约,发送交易等等。3 分钟快速接入星云链 (NAS) 测试网

    3 引用 • 16 回帖
  • SOHO

    为成为自由职业者在家办公而努力吧!

    7 引用 • 55 回帖 • 4 关注
  • BAE

    百度应用引擎(Baidu App Engine)提供了 PHP、Java、Python 的执行环境,以及云存储、消息服务、云数据库等全面的云服务。它可以让开发者实现自动地部署和管理应用,并且提供动态扩容和负载均衡的运行环境,让开发者不用考虑高成本的运维工作,只需专注于业务逻辑,大大降低了开发者学习和迁移的成本。

    19 引用 • 75 回帖 • 666 关注
  • Angular

    AngularAngularJS 的新版本。

    26 引用 • 66 回帖 • 549 关注
  • PWA

    PWA(Progressive Web App)是 Google 在 2015 年提出、2016 年 6 月开始推广的项目。它结合了一系列现代 Web 技术,在网页应用中实现和原生应用相近的用户体验。

    14 引用 • 69 回帖 • 175 关注
  • CloudFoundry

    Cloud Foundry 是 VMware 推出的业界第一个开源 PaaS 云平台,它支持多种框架、语言、运行时环境、云平台及应用服务,使开发人员能够在几秒钟内进行应用程序的部署和扩展,无需担心任何基础架构的问题。

    5 引用 • 18 回帖 • 181 关注
  • Mobi.css

    Mobi.css is a lightweight, flexible CSS framework that focus on mobile.

    1 引用 • 6 回帖 • 759 关注
  • API

    应用程序编程接口(Application Programming Interface)是一些预先定义的函数,目的是提供应用程序与开发人员基于某软件或硬件得以访问一组例程的能力,而又无需访问源码,或理解内部工作机制的细节。

    79 引用 • 431 回帖 • 1 关注
  • FreeMarker

    FreeMarker 是一款好用且功能强大的 Java 模版引擎。

    23 引用 • 20 回帖 • 466 关注
  • 安全

    安全永远都不是一个小问题。

    203 引用 • 818 回帖 • 1 关注
  • 工具

    子曰:“工欲善其事,必先利其器。”

    298 引用 • 763 回帖
  • 新人

    让我们欢迎这对新人。哦,不好意思说错了,让我们欢迎这位新人!
    新手上路,请谨慎驾驶!

    52 引用 • 228 回帖
  • LaTeX

    LaTeX(音译“拉泰赫”)是一种基于 ΤΕΧ 的排版系统,由美国计算机学家莱斯利·兰伯特(Leslie Lamport)在 20 世纪 80 年代初期开发,利用这种格式,即使使用者没有排版和程序设计的知识也可以充分发挥由 TeX 所提供的强大功能,能在几天,甚至几小时内生成很多具有书籍质量的印刷品。对于生成复杂表格和数学公式,这一点表现得尤为突出。因此它非常适用于生成高印刷质量的科技和数学类文档。

    12 引用 • 54 回帖 • 8 关注
  • FFmpeg

    FFmpeg 是一套可以用来记录、转换数字音频、视频,并能将其转化为流的开源计算机程序。

    23 引用 • 32 回帖
  • Spark

    Spark 是 UC Berkeley AMP lab 所开源的类 Hadoop MapReduce 的通用并行框架。Spark 拥有 Hadoop MapReduce 所具有的优点;但不同于 MapReduce 的是 Job 中间输出结果可以保存在内存中,从而不再需要读写 HDFS,因此 Spark 能更好地适用于数据挖掘与机器学习等需要迭代的 MapReduce 的算法。

    74 引用 • 46 回帖 • 563 关注
  • 知乎

    知乎是网络问答社区,连接各行各业的用户。用户分享着彼此的知识、经验和见解,为中文互联网源源不断地提供多种多样的信息。

    10 引用 • 66 回帖 • 1 关注
  • 招聘

    哪里都缺人,哪里都不缺人。

    188 引用 • 1057 回帖 • 1 关注
  • InfluxDB

    InfluxDB 是一个开源的没有外部依赖的时间序列数据库。适用于记录度量,事件及实时分析。

    2 引用 • 91 关注