Spark 学习之提交任务(六)

本贴最后更新于 2152 天前,其中的信息可能已经天翻地覆

本篇文章主要记录 Spark 的任务提交到集群上的过程

http://itechor.top/solo/articles/2018/12/17/1545016407680.html 这篇文章搭建好的集群环境上,进行任务的提交运行。

新建一个 maven 项目,以统计用户身高性别等为主,pom.xml 添加以下依赖:

<properties>
  <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
  <java.version>1.8></java.version>
  <spark.version>2.4.0></spark.version>
</properties>

<dependencies>
  <dependency>
	<groupId>com.thoughtworks.paranamer</groupId>
	<artifactId>paranamer</artifactId>
	<version>2.8</version>
  </dependency>
  <dependency>
	<groupId>org.apache.spark</groupId>
	<artifactId>spark-core_2.12</artifactId>
	<version>${spark.version}</version>
  </dependency>
  <dependency>
	<groupId>org.apache.spark</groupId>
	<artifactId>spark-sql_2.12</artifactId>
	<version>${spark.version}</version>
  </dependency>
  <dependency>
	<groupId>mysql</groupId>
	<artifactId>mysql-connector-java</artifactId>
	<version>8.0.13</version>
  </dependency>
</dependencies>

<build>
  <plugins>
	<plugin>
	  <groupId>org.apache.maven.plugins</groupId>
	  <artifactId>maven-assembly-plugin</artifactId>
	  <version>3.1.0</version>
	  <configuration>
		<descriptorRefs>
		  <descriptorRef>jar-with-dependencies</descriptorRef>
		</descriptorRefs>
		<archive>
		  <manifest>
			<mainClass>xxx.yyy</mainClass>
		  </manifest>
		</archive>
	  </configuration>
	  <executions>
		<execution>
		  <id>make-assembly</id>
		  <phase>package</phase>
		  <goals>
			<goal>single</goal>
		  </goals>
		</execution>
	  </executions>
	</plugin>
	<plugin>
	  <groupId>org.apache.maven.plugins</groupId>
	  <artifactId>maven-compiler-plugin</artifactId>
	  <version>3.8.0</version>
	  <configuration>
		<source>1.8</source>
		<target>1.8</target>
	  </configuration>
	</plugin>
  </plugins>
</build>

配置数据库的配置信息,application.xml:

mysql.datasource.url=jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf8&autoReconnect=true&useSSL=false
mysql.datasource.username=root
mysql.datasource.password=root
mysql.datasource.driverClassName=com.mysql.cj.jdbc.Driver

读取数据库配置文件 DataSourceUtil.java:

public class DataSourceUtil {
    public static String url(){
        return PropertyUtil.getInstance().getString("mysql.datasource.url");
  }
    public static String userName(){
        return PropertyUtil.getInstance().getString("mysql.datasource.username");
  }
    public static String passWord(){
        return PropertyUtil.getInstance().getString("mysql.datasource.password");
  }
    public static String driverClassName(){
        return PropertyUtil.getInstance().getString("mysql.datasource.driverClassName");
  }
}

PropertyUtil.java:

public class PropertyUtil {
    private PropertyUtil() {
    }
    private static class SingleTonHoler {
        private static ResourceBundle INSTANCE = ResourceBundle.getBundle("application");
  }
    public static ResourceBundle getInstance() {
        return SingleTonHoler.INSTANCE;
  }
}

MySQLService.java:

import cn.grgpay.analyze.util.DataSourceUtil;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.*;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.*;

import java.io.Serializable;
import java.util.*;

public class MySQLService implements Serializable {
    private static final long serialVersionUID = 396720719322480114L;

    public static void main(String[] args) {
        readMySQL();

  }

    private static void readMySQL() {
        SparkSession session = SparkSession.builder().master("local[*]").appName("readMySQLToDay").config("spark.sql.warehouse.dir", "./spark-warehouse").getOrCreate();
  SQLContext sqlContext = session.sqlContext();

  Properties connectionProperties = new Properties();
  connectionProperties.put("user", DataSourceUtil.userName());
  connectionProperties.put("password", DataSourceUtil.passWord());
  connectionProperties.put("driver", DataSourceUtil.driverClassName());
 long start = System.currentTimeMillis();

  // 读取person表中所有数据
  Dataset data = sqlContext.read().jdbc(DataSourceUtil.url(), "person", connectionProperties).select("*");
 long end = System.currentTimeMillis();
  System.out.println("读取数据库数据【"+data.count()+"】条,耗时:"+((end-start)/1000));
  // 过滤出性别为男的数据
  Dataset maleData = data.filter(new FilterFunction() {
            private static final long serialVersionUID = -6182357065815734414L;

  @Override
  public boolean call(Row value) {
                String sex = value.getAs("sex");
 return sex.equals("男");
  }
        });
// 得到性别为男的身高数据
  Dataset maleHeightData = maleData.map(new MapFunction, Integer>() {
            private static final long serialVersionUID = -7881663810003682651L;

  @Override
  public Integer call(Row value) {
                return value.getAs("height");
  }
        }, Encoders.INT());

  // 全部男性身高相加
  Integer maleReduce = maleHeightData.reduce(new ReduceFunction() {
            private static final long serialVersionUID = -7419948477276929434L;

  @Override
  public Integer call(Integer v1, Integer v2) {
                return v1 + v2;
  }
        });

  Dataset maleHeight = maleData.sort(maleData.col("height").desc());//男性身高倒序排序
  Dataset lowerMaleHeight = maleData.sort(maleData.col("height").asc());//男性身高升序排序
  System.out.println("男性平均身高:"+(maleReduce/maleHeightData.count())+",最高的男性身高为:" + maleHeight.first() + ",最矮:" + lowerMaleHeight.first());

// 过滤出性别为女的数据
  Dataset feMaleData = data.filter(new FilterFunction() {
            private static final long serialVersionUID = 6593222075687505570L;

  @Override
  public boolean call(Row value) {
                String sex = value.getAs("sex");
 return sex.equals("女");
  }
        });
// 得到性别为女的身高数据
  Dataset femaleHeightData = feMaleData.map(new MapFunction, Integer>() {
            private static final long serialVersionUID = -7881663810003682651L;

  @Override
  public Integer call(Row value) {
                return value.getAs("height");
  }
        }, Encoders.INT());

  // 全部女性身高相加
  Integer femaleReduce = femaleHeightData.reduce(new ReduceFunction() {
            private static final long serialVersionUID = -7419948477276929434L;

  @Override
  public Integer call(Integer v1, Integer v2) {
                return v1 + v2;
  }
        });

  Dataset femaleHeight = feMaleData.sort(feMaleData.col("height").desc());//女性身高倒序排序
  Dataset lowerFemaleHeight = feMaleData.sort(feMaleData.col("height").asc());//女性身高升序排序
  System.out.println("女性平均身高:"+(femaleReduce/femaleHeightData.count())+",最高的女性身高为:" + femaleHeight.first() + ",最矮:" + lowerFemaleHeight.first());

  System.out.println("计算耗时:"+((System.currentTimeMillis()-end)/1000));

  }
}

如果本地有安装 Spark 服务,可以直接右键 Run 这个 main 函数即可计算出结果。

下面介绍一下提交任务到 Spark 集群中运行。

其实提交任务到 Spark 集群也很简单,先 maven 打包出 jar,把 jar 包上传到 Spark 的 Master 节点的任意目录,执行命令:

spark-submit --master spark://spark1:7077 --class xxx.yyy.MySQLService /usr/local/apps/test-1.0.jar

--master spark://spark1:7077
  这个是指定master节点的地址
--class xxx.yyy.MySQLService
  这个是指定执行那个类的main函数
/usr/local/apps/test-1.0.jar
  这个是指定jar包的路径

这样就可以提交任务到 Spark 集群里了。






扫一扫有惊喜: [![imagepng](http://itechor.top/solo/upload/bb791a58c3a84193b7f643b6849482c5_image.png) ](http://ym0214.com)
  • Spark

    Spark 是 UC Berkeley AMP lab 所开源的类 Hadoop MapReduce 的通用并行框架。Spark 拥有 Hadoop MapReduce 所具有的优点;但不同于 MapReduce 的是 Job 中间输出结果可以保存在内存中,从而不再需要读写 HDFS,因此 Spark 能更好地适用于数据挖掘与机器学习等需要迭代的 MapReduce 的算法。

    74 引用 • 46 回帖 • 552 关注
  • 大数据

    大数据(big data)是指无法在一定时间范围内用常规软件工具进行捕捉、管理和处理的数据集合,是需要新处理模式才能具有更强的决策力、洞察发现力和流程优化能力的海量、高增长率和多样化的信息资产。

    93 引用 • 113 回帖

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...

推荐标签 标签

  • Notion

    Notion - The all-in-one workspace for your notes, tasks, wikis, and databases.

    6 引用 • 38 回帖
  • 前端

    前端技术一般分为前端设计和前端开发,前端设计可以理解为网站的视觉设计,前端开发则是网站的前台代码实现,包括 HTML、CSS 以及 JavaScript 等。

    247 引用 • 1348 回帖
  • HBase

    HBase 是一个分布式的、面向列的开源数据库,该技术来源于 Fay Chang 所撰写的 Google 论文 “Bigtable:一个结构化数据的分布式存储系统”。就像 Bigtable 利用了 Google 文件系统所提供的分布式数据存储一样,HBase 在 Hadoop 之上提供了类似于 Bigtable 的能力。

    17 引用 • 6 回帖 • 73 关注
  • Flume

    Flume 是一套分布式的、可靠的,可用于有效地收集、聚合和搬运大量日志数据的服务架构。

    9 引用 • 6 回帖 • 629 关注
  • B3log

    B3log 是一个开源组织,名字来源于“Bulletin Board Blog”缩写,目标是将独立博客与论坛结合,形成一种新的网络社区体验,详细请看 B3log 构思。目前 B3log 已经开源了多款产品:SymSoloVditor思源笔记

    1063 引用 • 3453 回帖 • 203 关注
  • GraphQL

    GraphQL 是一个用于 API 的查询语言,是一个使用基于类型系统来执行查询的服务端运行时(类型系统由你的数据定义)。GraphQL 并没有和任何特定数据库或者存储引擎绑定,而是依靠你现有的代码和数据支撑。

    4 引用 • 3 回帖 • 9 关注
  • IBM

    IBM(国际商业机器公司)或万国商业机器公司,简称 IBM(International Business Machines Corporation),总公司在纽约州阿蒙克市。1911 年托马斯·沃森创立于美国,是全球最大的信息技术和业务解决方案公司,拥有全球雇员 30 多万人,业务遍及 160 多个国家和地区。

    17 引用 • 53 回帖 • 136 关注
  • RYMCU

    RYMCU 致力于打造一个即严谨又活泼、专业又不失有趣,为数百万人服务的开源嵌入式知识学习交流平台。

    4 引用 • 6 回帖 • 51 关注
  • 小薇

    小薇是一个用 Java 写的 QQ 聊天机器人 Web 服务,可以用于社群互动。

    由于 Smart QQ 从 2019 年 1 月 1 日起停止服务,所以该项目也已经停止维护了!

    34 引用 • 467 回帖 • 742 关注
  • 心情

    心是产生任何想法的源泉,心本体会陷入到对自己本体不能理解的状态中,因为心能产生任何想法,不能分出对错,不能分出自己。

    59 引用 • 369 回帖
  • SVN

    SVN 是 Subversion 的简称,是一个开放源代码的版本控制系统,相较于 RCS、CVS,它采用了分支管理系统,它的设计目标就是取代 CVS。

    29 引用 • 98 回帖 • 680 关注
  • Spring

    Spring 是一个开源框架,是于 2003 年兴起的一个轻量级的 Java 开发框架,由 Rod Johnson 在其著作《Expert One-On-One J2EE Development and Design》中阐述的部分理念和原型衍生而来。它是为了解决企业应用开发的复杂性而创建的。框架的主要优势之一就是其分层架构,分层架构允许使用者选择使用哪一个组件,同时为 JavaEE 应用程序开发提供集成的框架。

    944 引用 • 1459 回帖 • 17 关注
  • 开源中国

    开源中国是目前中国最大的开源技术社区。传播开源的理念,推广开源项目,为 IT 开发者提供了一个发现、使用、并交流开源技术的平台。目前开源中国社区已收录超过两万款开源软件。

    7 引用 • 86 回帖
  • C

    C 语言是一门通用计算机编程语言,应用广泛。C 语言的设计目标是提供一种能以简易的方式编译、处理低级存储器、产生少量的机器码以及不需要任何运行环境支持便能运行的编程语言。

    85 引用 • 165 回帖 • 1 关注
  • Linux

    Linux 是一套免费使用和自由传播的类 Unix 操作系统,是一个基于 POSIX 和 Unix 的多用户、多任务、支持多线程和多 CPU 的操作系统。它能运行主要的 Unix 工具软件、应用程序和网络协议,并支持 32 位和 64 位硬件。Linux 继承了 Unix 以网络为核心的设计思想,是一个性能稳定的多用户网络操作系统。

    943 引用 • 943 回帖
  • 百度

    百度(Nasdaq:BIDU)是全球最大的中文搜索引擎、最大的中文网站。2000 年 1 月由李彦宏创立于北京中关村,致力于向人们提供“简单,可依赖”的信息获取方式。“百度”二字源于中国宋朝词人辛弃疾的《青玉案·元夕》词句“众里寻他千百度”,象征着百度对中文信息检索技术的执著追求。

    63 引用 • 785 回帖 • 175 关注
  • Netty

    Netty 是一个基于 NIO 的客户端-服务器编程框架,使用 Netty 可以让你快速、简单地开发出一个可维护、高性能的网络应用,例如实现了某种协议的客户、服务端应用。

    49 引用 • 33 回帖 • 22 关注
  • 深度学习

    深度学习(Deep Learning)是机器学习的分支,是一种试图使用包含复杂结构或由多重非线性变换构成的多个处理层对数据进行高层抽象的算法。

    53 引用 • 40 回帖 • 2 关注
  • 链滴

    链滴是一个记录生活的地方。

    记录生活,连接点滴

    153 引用 • 3783 回帖 • 1 关注
  • Thymeleaf

    Thymeleaf 是一款用于渲染 XML/XHTML/HTML5 内容的模板引擎。类似 Velocity、 FreeMarker 等,它也可以轻易的与 Spring 等 Web 框架进行集成作为 Web 应用的模板引擎。与其它模板引擎相比,Thymeleaf 最大的特点是能够直接在浏览器中打开并正确显示模板页面,而不需要启动整个 Web 应用。

    11 引用 • 19 回帖 • 354 关注
  • GitHub

    GitHub 于 2008 年上线,目前,除了 Git 代码仓库托管及基本的 Web 管理界面以外,还提供了订阅、讨论组、文本渲染、在线文件编辑器、协作图谱(报表)、代码片段分享(Gist)等功能。正因为这些功能所提供的便利,又经过长期的积累,GitHub 的用户活跃度很高,在开源世界里享有深远的声望,并形成了社交化编程文化(Social Coding)。

    209 引用 • 2031 回帖 • 1 关注
  • CentOS

    CentOS(Community Enterprise Operating System)是 Linux 发行版之一,它是来自于 Red Hat Enterprise Linux 依照开放源代码规定释出的源代码所编译而成。由于出自同样的源代码,因此有些要求高度稳定的服务器以 CentOS 替代商业版的 Red Hat Enterprise Linux 使用。两者的不同在于 CentOS 并不包含封闭源代码软件。

    238 引用 • 224 回帖
  • Love2D

    Love2D 是一个开源的, 跨平台的 2D 游戏引擎。使用纯 Lua 脚本来进行游戏开发。目前支持的平台有 Windows, Mac OS X, Linux, Android 和 iOS。

    14 引用 • 53 回帖 • 531 关注
  • 游戏

    沉迷游戏伤身,强撸灰飞烟灭。

    176 引用 • 815 回帖
  • Sublime

    Sublime Text 是一款可以用来写代码、写文章的文本编辑器。支持代码高亮、自动完成,还支持通过插件进行扩展。

    10 引用 • 5 回帖
  • 互联网

    互联网(Internet),又称网际网络,或音译因特网、英特网。互联网始于 1969 年美国的阿帕网,是网络与网络之间所串连成的庞大网络,这些网络以一组通用的协议相连,形成逻辑上的单一巨大国际网络。

    98 引用 • 344 回帖
  • 工具

    子曰:“工欲善其事,必先利其器。”

    286 引用 • 729 回帖