前面的分布式环境搭建是为了能够了解到 Spark 集群架构,一般测试环境都是本地环境,不需要作业提交,这个过程太过繁琐,浪费很多时间。但是对于一些不得不在集群环境试验的实例就需要提交作业到集群(分布式,伪分布式)上。
Spark 本地环境的搭建
IDE:Intellij IDEA 2017
,注册使用 license server:填入以下 url
在安装插件界面安装 scala.
2、JDK1.8
:支持 Lambda 表达式,函数式编程。
3、Spark2.x
:官网下载包含 Hadoop 的安装包,在本地解压,配置环境变量:SparkHome,path,path 中配置 bin,sbin 目录
SPARK_HOME D:\Applications\spark-2.1.1-bin-hadoop2.7
path:%SPARK_HOME%\bin
path:%SPARK_HOME%\sbin
4、打开终端;输入 Spark-shell,出现如下信息则成功
C:\Users\rzx>spark-shell
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
17/07/18 10:54:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/07/18 10:54:24 WARN General: Plugin (Bundle) "org.datanucleus" is already registered. Ensure you dont have multiple JAR versions of the same plugin in the classpath. The URL "file:/D:/Applications/spark-2.1.1-bin-hadoop2.7/bin/../jars/datanucleus-core-3.2.10.jar" is already registered, and you are trying to register an identical plugin located at URL "file:/D:/Applications/spark-2.1.1-bin-hadoop2.7/jars/datanucleus-core-3.2.10.jar."
17/07/18 10:54:24 WARN General: Plugin (Bundle) "org.datanucleus.api.jdo" is already registered. Ensure you dont have multiple JAR versions of the same plugin in the classpath. The URL "file:/D:/Applications/spark-2.1.1-bin-hadoop2.7/jars/datanucleus-api-jdo-3.2.6.jar" is already registered, and you are trying to register an identical plugin located at URL "file:/D:/Applications/spark-2.1.1-bin-hadoop2.7/bin/../jars/datanucleus-api-jdo-3.2.6.jar."
17/07/18 10:54:24 WARN General: Plugin (Bundle) "org.datanucleus.store.rdbms" is already registered. Ensure you dont have multiple JAR versions of the same plugin in the classpath. The URL "file:/D:/Applications/spark-2.1.1-bin-hadoop2.7/jars/datanucleus-rdbms-3.2.9.jar" is already registered, and you are trying to register an identical plugin located at URL "file:/D:/Applications/spark-2.1.1-bin-hadoop2.7/bin/../jars/datanucleus-rdbms-3.2.9.jar."
17/07/18 10:54:35 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
Spark context Web UI available at http://ip:4040
Spark context available as 'sc' (master = local[*], app id = local-1500346461072).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.1.1
/_/
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_102)
Type in expressions to have them evaluated.
Type :help for more information.
scala>
5、打开 IDEA,创建一个 Project:New->Project->scala-> 填入 Name 名称-> 选择 Jdk,scala->finish
6、创建 project 之后,我们需要导入 Spark 的环境 Jar:右键 Project->open Module Setting->Libraries-> 点击 +-> 选 Java 找到 SparkHome 目录选择 Jars 这个目录->OK
7、对于 5,6 步我们也可以直接选择创建 Maven 工程,引入 jar,不过太慢。。
第一个 Spark 程序:wordcount
/*
---------------------------------------------------------
input/words:
hello word
hello you
hello word
hello you
---------------------------------------------------------
*/
public class WordCount {
public static void main(String [] args){
SparkConf conf = new SparkConf()
.setAppName("WountCountLocal")
.setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD lines = sc.textFile("input/words");
JavaRDD words =
lines.flatMap(line -> Arrays.asList(line.split(" ")).listIterator());
JavaPairRDD counts =
words.mapToPair(w -> new Tuple2(w, 1))
.reduceByKey((x, y) -> x + y);
counts.foreach(cn -> System.out.println(cn._1+" "+cn._2));
/*
hello 4
world 2
you 2
*/
//scala
object WordCount {
def main(args:Array[String]): Unit ={
val conf = new SparkConf().setAppName("wordcount").setMaster("local[*]");
val sc = new SparkContext(conf);
val lines = sc.textFile("input/words");
val words = lines.flatMap((line) =>line.split(" "));
val pairs = words.map(word =>(word,1))
println(pairs)
val wordCounts = pairs.reduceByKey(_+_).sortByKey();
wordCounts.foreach(wordcount =>println(wordcount._1+" - "+wordcount._2))
}
}
WordCount 的基本原理
wordcount 主要分为四个步骤:
- 读取文件,构建 linesRDD
- 将每行数据切分成单个单词
- 单词映射成(word,1)这种形式
- 聚和,首先是同一个 paritation 内的数据按 key 进行 value 的聚合,然后是 paritation 间的数据聚合,得到 wordcount
上图中每一行是一个 Node 节点,每一列是一个 RDD ,可见一个 RDD 可以分布在不同的节点上,每个节点上的数据称之为一个 Paritation(分区),map,flatMap 都是在同一个分区内进行,而在 reduceBykey 会将各个分区的数据重新组织,这个过程是一个
Shuffle
过程。
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于