我们首先来看看 spark-shell 到底做了什么,spark-shell 中有一段脚本内容如下:
function main() {
if $cygwin; then
# Workaround for issue involving JLine and Cygwin
# (see http://sourceforge.net/p/jline/bugs/40/).
# If you're using the Mintty terminal emulator in Cygwin, may need to set the
# "Backspace sends ^H" setting in "Keys" section of the Mintty options
# (see https://github.com/sbt/sbt/issues/562).
stty -icanon min 1 -echo > /dev/null 2>&1
export SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS -Djline.terminal=unix"
"${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.repl.Main --name "Spark shell" "$@"
stty icanon echo > /dev/null 2>&1
else
export SPARK_SUBMIT_OPTS
"${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.repl.Main --name "Spark shell" "$@"
fi
}
在上面的脚本中,实际上市执行了 spark-submit,查看 spark-submit 代码:
if [ -z "${SPARK_HOME}" ]; then
export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
fi
# disable randomized hash for string in Python 3.3+
export PYTHONHASHSEED=0
exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
非常简单,执行 spark-class 并传入参数.继续查看 spark-class 脚本内容:
if [ -z "${SPARK_HOME}" ]; then
export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
fi
. "${SPARK_HOME}"/bin/load-spark-env.sh
执行 load-spark-env.sh 加载环境变量,稍后在讨论这个脚本
......
build_command() {
"$RUNNER" -Xmx128m -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"
printf "%d\0" $?
}
......
CMD=()
while IFS= read -d '' -r ARG; do
CMD+=("$ARG")
done < <(build_command "$@")
......
CMD=("${CMD[@]:0:$LAST}")
exec "${CMD[@]}"
c 从上面可以看出执行了 org.apache.spark.launcher.Main ,继续打开 org.apache.spark.launcher.Main 查看代码
public static void main(String[] argsArray) throws Exception {
checkArgument(argsArray.length > 0, "Not enough arguments: missing class name.");
List<String> args = new ArrayList<>(Arrays.asList(argsArray));
String className = args.remove(0);
boolean printLaunchCommand = !isEmpty(System.getenv("SPARK_PRINT_LAUNCH_COMMAND"));
AbstractCommandBuilder builder;
if (className.equals("org.apache.spark.deploy.SparkSubmit")) {
try {
builder = new SparkSubmitCommandBuilder(args);
} catch (IllegalArgumentException e) {
printLaunchCommand = false;
System.err.println("Error: " + e.getMessage());
System.err.println();
MainClassOptionParser parser = new MainClassOptionParser();
try {
parser.parse(args);
} catch (Exception ignored) {
// Ignore parsing exceptions.
}
List<String> help = new ArrayList<>();
if (parser.className != null) {
help.add(parser.CLASS);
help.add(parser.className);
}
help.add(parser.USAGE_ERROR);
builder = new SparkSubmitCommandBuilder(help);
}
} else {
builder = new SparkClassCommandBuilder(className, args);
}
Map<String, String> env = new HashMap<>();
List<String> cmd = builder.buildCommand(env);
if (printLaunchCommand) {
System.err.println("Spark Command: " + join(" ", cmd));
System.err.println("========================================");
}
if (isWindows()) {
System.out.println(prepareWindowsCommand(cmd, env));
} else {
// In bash, use NULL as the arg separator since it cannot be used in an argument.
List<String> bashCmd = prepareBashCommand(cmd, env);
for (String c : bashCmd) {
System.out.print(c);
System.out.print('\0');
}
}
}
从上面的分析我们可知,spark-submit 传递给 spark-class 的参数为 org.apache.spark.deploy.SparkSubmit,所以在 org.apache.spark.launcher.Main 执行的应该是
builder = new SparkSubmitCommandBuilder(args);
设置一些参数信息
继续往下执行 builder.buildCommand(env); 查看 buildCommand 内容
@Override
public List<String> buildCommand(Map<String, String> env)
throws IOException, IllegalArgumentException {
if (PYSPARK_SHELL.equals(appResource) && isAppResourceReq) {
return buildPySparkShellCommand(env);
} else if (SPARKR_SHELL.equals(appResource) && isAppResourceReq) {
return buildSparkRCommand(env);
} else {
return buildSparkSubmitCommand(env);
}
}
判断启动时的哪种环境 py,shell,or submit 然后构建命令,继续查看 buildSparkSubmitCommand 函数
其中有如下代码:
...
addPermGenSizeOpt(cmd);
cmd.add("org.apache.spark.deploy.SparkSubmit");
cmd.addAll(buildSparkSubmitArgs());
return cmd;
好了继续查看 org.apache.spark.deploy.SparkSubmit
spark main 线程 dump 信息
"main" #1 prio=5 os_prio=31 tid=0x00007f807180c800 nid=0x1c03 runnable [0x0000700003b08000]
java.lang.Thread.State: RUNNABLE
at java.io.FileInputStream.read0(Native Method)
at java.io.FileInputStream.read(FileInputStream.java:207)
at jline.internal.NonBlockingInputStream.read(NonBlockingInputStream.java:169)
- locked <0x00000007830bf508> (a jline.internal.NonBlockingInputStream)
at jline.internal.NonBlockingInputStream.read(NonBlockingInputStream.java:137)
at jline.internal.NonBlockingInputStream.read(NonBlockingInputStream.java:246)
at jline.internal.InputStreamReader.read(InputStreamReader.java:261)
- locked <0x00000007830bf508> (a jline.internal.NonBlockingInputStream)
at jline.internal.InputStreamReader.read(InputStreamReader.java:198)
- locked <0x00000007830bf508> (a jline.internal.NonBlockingInputStream)
at jline.console.ConsoleReader.readCharacter(ConsoleReader.java:2145)
at jline.console.ConsoleReader.readLine(ConsoleReader.java:2349)
at jline.console.ConsoleReader.readLine(ConsoleReader.java:2269)
at scala.tools.nsc.interpreter.jline.InteractiveReader.readOneLine(JLineReader.scala:57)
at scala.tools.nsc.interpreter.InteractiveReader$$anonfun$readLine$2.apply(InteractiveReader.scala:37)
at scala.tools.nsc.interpreter.InteractiveReader$$anonfun$readLine$2.apply(InteractiveReader.scala:37)
at scala.tools.nsc.interpreter.InteractiveReader$.restartSysCalls(InteractiveReader.scala:44)
at scala.tools.nsc.interpreter.InteractiveReader$class.readLine(InteractiveReader.scala:37)
at scala.tools.nsc.interpreter.jline.InteractiveReader.readLine(JLineReader.scala:28)
at scala.tools.nsc.interpreter.ILoop.readOneLine(ILoop.scala:404)
at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:413)
at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:923)
at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909)
at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909)
at scala.reflect.internal.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:97)
at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:909)
at org.apache.spark.repl.Main$.doMain(Main.scala:68)
at org.apache.spark.repl.Main$.main(Main.scala:51)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:736)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Locked ownable synchronizers:
- None
从堆栈中信息中我们可以看出程序的调用顺序:SparkSubmit.main => repl.Main.main => ILoop.process
ILoop.process 中如下代码:
def process(settings: Settings): Boolean = savingContextLoader {
this.settings = settings
createInterpreter()
// sets in to some kind of reader depending on environmental cues
in = in0.fold(chooseReader(settings))(r => SimpleReader(r, out, interactive = true))
globalFuture = future {
intp.initializeSynchronous()
loopPostInit()
!intp.reporter.hasErrors
}
loadFiles(settings)
printWelcome()
try loop() match {
case LineResults.EOF => out print Properties.shellInterruptedString
case _ =>
}
catch AbstractOrMissingHandler()
finally closeInterpreter()
true
}
在 process 中我们发现调用了 loadFiles 并且打印 Welcome 信息
SparkLoop 继承了 loadFiles 并且复写了 loadFiles 方法 如下:
override def loadFiles(settings: Settings): Unit = {
initializeSpark()
super.loadFiles(settings)
}
在 loadFiles 中调度 initalizeSpark ,查看源码如下:
def initializeSpark() {
intp.beQuietDuring {
processLine("""
@transient val spark = if (org.apache.spark.repl.Main.sparkSession != null) {
org.apache.spark.repl.Main.sparkSession
} else {
org.apache.spark.repl.Main.createSparkSession()
}
@transient val sc = {
val _sc = spark.sparkContext
_sc.uiWebUrl.foreach(webUrl => println(s"Spark context Web UI available at ${webUrl}"))
println("Spark context available as 'sc' " +
s"(master = ${_sc.master}, app id = ${_sc.applicationId}).")
println("Spark session available as 'spark'.")
_sc
}
""")
processLine("import org.apache.spark.SparkContext._")
processLine("import spark.implicits._")
processLine("import spark.sql")
processLine("import org.apache.spark.sql.functions._")
replayCommandStack = Nil // remove above commands from session history.
}
}
从上面可以看出,如果 SparkSession 已存在,那么直接返回,否则调用 createSparkSession
最后从 SparkSession 中返回 SparkContext 查看 createSparkSession 源码
def createSparkSession(): SparkSession = {
val execUri = System.getenv("SPARK_EXECUTOR_URI")
conf.setIfMissing("spark.app.name", "Spark shell")
// SparkContext will detect this configuration and register it with the RpcEnv's
// file server, setting spark.repl.class.uri to the actual URI for executors to
// use. This is sort of ugly but since executors are started as part of SparkContext
// initialization in certain cases, there's an initialization order issue that prevents
// this from being set after SparkContext is instantiated.
conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath())
if (execUri != null) {
conf.set("spark.executor.uri", execUri)
}
if (System.getenv("SPARK_HOME") != null) {
conf.setSparkHome(System.getenv("SPARK_HOME"))
}
val builder = SparkSession.builder.config(conf)
if (conf.get(CATALOG_IMPLEMENTATION.key, "hive").toLowerCase == "hive") {
if (SparkSession.hiveClassesArePresent) {
// In the case that the property is not set at all, builder's config
// does not have this value set to 'hive' yet. The original default
// behavior is that when there are hive classes, we use hive catalog.
sparkSession = builder.enableHiveSupport().getOrCreate()
logInfo("Created Spark session with Hive support")
} else {
// Need to change it back to 'in-memory' if no hive classes are found
// in the case that the property is set to hive in spark-defaults.conf
builder.config(CATALOG_IMPLEMENTATION.key, "in-memory")
sparkSession = builder.getOrCreate()
logInfo("Created Spark session")
}
} else {
// In the case that the property is set but not to 'hive', the internal
// default is 'in-memory'. So the sparkSession will use in-memory catalog.
sparkSession = builder.getOrCreate()
logInfo("Created Spark session")
}
sparkContext = sparkSession.sparkContext
Signaling.cancelOnInterrupt(sparkContext)
sparkSession
}
这里最后使用 SparkConf 设置一些必要的参数并且通过 Builder 创建 sparkSession 并且判断是否需要启用 Hive 的支持。
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于