Spark 的本地环境搭建及 wordCount

本贴最后更新于 2681 天前,其中的信息可能已经时移俗易

前面的分布式环境搭建是为了能够了解到 Spark 集群架构,一般测试环境都是本地环境,不需要作业提交,这个过程太过繁琐,浪费很多时间。但是对于一些不得不在集群环境试验的实例就需要提交作业到集群(分布式,伪分布式)上。

Spark 本地环境的搭建

  1. IDE:Intellij IDEA 2017,注册使用 license server:填入以下 url

在安装插件界面安装 scala.
2、JDK1.8:支持 Lambda 表达式,函数式编程。
3、Spark2.x:官网下载包含 Hadoop 的安装包,在本地解压,配置环境变量:SparkHome,path,path 中配置 bin,sbin 目录

SPARK_HOME D:\Applications\spark-2.1.1-bin-hadoop2.7
path:%SPARK_HOME%\bin
path:%SPARK_HOME%\sbin

4、打开终端;输入 Spark-shell,出现如下信息则成功

C:\Users\rzx>spark-shell
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
17/07/18 10:54:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/07/18 10:54:24 WARN General: Plugin (Bundle) "org.datanucleus" is already registered. Ensure you dont have multiple JAR versions of the same plugin in the classpath. The URL "file:/D:/Applications/spark-2.1.1-bin-hadoop2.7/bin/../jars/datanucleus-core-3.2.10.jar" is already registered, and you are trying to register an identical plugin located at URL "file:/D:/Applications/spark-2.1.1-bin-hadoop2.7/jars/datanucleus-core-3.2.10.jar."
17/07/18 10:54:24 WARN General: Plugin (Bundle) "org.datanucleus.api.jdo" is already registered. Ensure you dont have multiple JAR versions of the same plugin in the classpath. The URL "file:/D:/Applications/spark-2.1.1-bin-hadoop2.7/jars/datanucleus-api-jdo-3.2.6.jar" is already registered, and you are trying to register an identical plugin located at URL "file:/D:/Applications/spark-2.1.1-bin-hadoop2.7/bin/../jars/datanucleus-api-jdo-3.2.6.jar."
17/07/18 10:54:24 WARN General: Plugin (Bundle) "org.datanucleus.store.rdbms" is already registered. Ensure you dont have multiple JAR versions of the same plugin in the classpath. The URL "file:/D:/Applications/spark-2.1.1-bin-hadoop2.7/jars/datanucleus-rdbms-3.2.9.jar" is already registered, and you are trying to register an identical plugin located at URL "file:/D:/Applications/spark-2.1.1-bin-hadoop2.7/bin/../jars/datanucleus-rdbms-3.2.9.jar."
17/07/18 10:54:35 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
Spark context Web UI available at http://ip:4040
Spark context available as 'sc' (master = local[*], app id = local-1500346461072).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.1.1
      /_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_102)
Type in expressions to have them evaluated.
Type :help for more information.

scala>

5、打开 IDEA,创建一个 Project:New->Project->scala-> 填入 Name 名称-> 选择 Jdk,scala->finish
6、创建 project 之后,我们需要导入 Spark 的环境 Jar:右键 Project->open Module Setting->Libraries-> 点击 +-> 选 Java 找到 SparkHome 目录选择 Jars 这个目录->OK
7、对于 5,6 步我们也可以直接选择创建 Maven 工程,引入 jar,不过太慢。。

第一个 Spark 程序:wordcount

/*
---------------------------------------------------------
input/words:
hello word 
hello you 
hello word 
hello you 
---------------------------------------------------------
*/
public class WordCount {
  public static void main(String [] args){
	  SparkConf conf = new SparkConf()
			  .setAppName("WountCountLocal")
			  .setMaster("local[*]");
	  JavaSparkContext sc = new JavaSparkContext(conf);
	  
	  JavaRDD lines = sc.textFile("input/words");
	  JavaRDD words =
			  lines.flatMap(line -> Arrays.asList(line.split(" ")).listIterator());
	  JavaPairRDD counts =
			  words.mapToPair(w -> new Tuple2(w, 1))
					  .reduceByKey((x, y) -> x + y);
	  counts.foreach(cn -> System.out.println(cn._1+" "+cn._2));
/*
hello 4
world 2
you 2
*/
//scala
object  WordCount {
    def main(args:Array[String]): Unit ={
        val conf = new SparkConf().setAppName("wordcount").setMaster("local[*]");
        val sc = new SparkContext(conf);
		
        val lines = sc.textFile("input/words");
        val words = lines.flatMap((line) =>line.split(" "));
        val pairs = words.map(word =>(word,1))
        println(pairs)
        val wordCounts = pairs.reduceByKey(_+_).sortByKey();
        wordCounts.foreach(wordcount =>println(wordcount._1+" - "+wordcount._2))
    }
}

WordCount 的基本原理

22323d335fe1415e95f7ae0bcf054012-image.png
wordcount 主要分为四个步骤:

  • 读取文件,构建 linesRDD
  • 将每行数据切分成单个单词
  • 单词映射成(word,1)这种形式
  • 聚和,首先是同一个 paritation 内的数据按 key 进行 value 的聚合,然后是 paritation 间的数据聚合,得到 wordcount

上图中每一行是一个 Node 节点,每一列是一个 RDD ,可见一个 RDD 可以分布在不同的节点上,每个节点上的数据称之为一个 Paritation(分区),map,flatMap 都是在同一个分区内进行,而在 reduceBykey 会将各个分区的数据重新组织,这个过程是一个 Shuffle 过程。

  • Spark

    Spark 是 UC Berkeley AMP lab 所开源的类 Hadoop MapReduce 的通用并行框架。Spark 拥有 Hadoop MapReduce 所具有的优点;但不同于 MapReduce 的是 Job 中间输出结果可以保存在内存中,从而不再需要读写 HDFS,因此 Spark 能更好地适用于数据挖掘与机器学习等需要迭代的 MapReduce 的算法。

    74 引用 • 46 回帖 • 552 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...
rzx
此生最怕深情被辜负,最怕兄弟成陌路。对世界充满善意,同时又充满深深的恨意,我渴望天降甘霖福泽众生,又渴望灭世洪水重创世纪。 广州