Spark 学习之提交任务(六)

本贴最后更新于 1942 天前,其中的信息可能已经天翻地覆

本篇文章主要记录 Spark 的任务提交到集群上的过程

http://itechor.top/solo/articles/2018/12/17/1545016407680.html 这篇文章搭建好的集群环境上,进行任务的提交运行。

新建一个 maven 项目,以统计用户身高性别等为主,pom.xml 添加以下依赖:

<properties>
  <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
  <java.version>1.8></java.version>
  <spark.version>2.4.0></spark.version>
</properties>

<dependencies>
  <dependency>
	<groupId>com.thoughtworks.paranamer</groupId>
	<artifactId>paranamer</artifactId>
	<version>2.8</version>
  </dependency>
  <dependency>
	<groupId>org.apache.spark</groupId>
	<artifactId>spark-core_2.12</artifactId>
	<version>${spark.version}</version>
  </dependency>
  <dependency>
	<groupId>org.apache.spark</groupId>
	<artifactId>spark-sql_2.12</artifactId>
	<version>${spark.version}</version>
  </dependency>
  <dependency>
	<groupId>mysql</groupId>
	<artifactId>mysql-connector-java</artifactId>
	<version>8.0.13</version>
  </dependency>
</dependencies>

<build>
  <plugins>
	<plugin>
	  <groupId>org.apache.maven.plugins</groupId>
	  <artifactId>maven-assembly-plugin</artifactId>
	  <version>3.1.0</version>
	  <configuration>
		<descriptorRefs>
		  <descriptorRef>jar-with-dependencies</descriptorRef>
		</descriptorRefs>
		<archive>
		  <manifest>
			<mainClass>xxx.yyy</mainClass>
		  </manifest>
		</archive>
	  </configuration>
	  <executions>
		<execution>
		  <id>make-assembly</id>
		  <phase>package</phase>
		  <goals>
			<goal>single</goal>
		  </goals>
		</execution>
	  </executions>
	</plugin>
	<plugin>
	  <groupId>org.apache.maven.plugins</groupId>
	  <artifactId>maven-compiler-plugin</artifactId>
	  <version>3.8.0</version>
	  <configuration>
		<source>1.8</source>
		<target>1.8</target>
	  </configuration>
	</plugin>
  </plugins>
</build>

配置数据库的配置信息,application.xml:

mysql.datasource.url=jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf8&autoReconnect=true&useSSL=false
mysql.datasource.username=root
mysql.datasource.password=root
mysql.datasource.driverClassName=com.mysql.cj.jdbc.Driver

读取数据库配置文件 DataSourceUtil.java:

public class DataSourceUtil {
    public static String url(){
        return PropertyUtil.getInstance().getString("mysql.datasource.url");
  }
    public static String userName(){
        return PropertyUtil.getInstance().getString("mysql.datasource.username");
  }
    public static String passWord(){
        return PropertyUtil.getInstance().getString("mysql.datasource.password");
  }
    public static String driverClassName(){
        return PropertyUtil.getInstance().getString("mysql.datasource.driverClassName");
  }
}

PropertyUtil.java:

public class PropertyUtil {
    private PropertyUtil() {
    }
    private static class SingleTonHoler {
        private static ResourceBundle INSTANCE = ResourceBundle.getBundle("application");
  }
    public static ResourceBundle getInstance() {
        return SingleTonHoler.INSTANCE;
  }
}

MySQLService.java:

import cn.grgpay.analyze.util.DataSourceUtil;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.*;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.*;

import java.io.Serializable;
import java.util.*;

public class MySQLService implements Serializable {
    private static final long serialVersionUID = 396720719322480114L;

    public static void main(String[] args) {
        readMySQL();

  }

    private static void readMySQL() {
        SparkSession session = SparkSession.builder().master("local[*]").appName("readMySQLToDay").config("spark.sql.warehouse.dir", "./spark-warehouse").getOrCreate();
  SQLContext sqlContext = session.sqlContext();

  Properties connectionProperties = new Properties();
  connectionProperties.put("user", DataSourceUtil.userName());
  connectionProperties.put("password", DataSourceUtil.passWord());
  connectionProperties.put("driver", DataSourceUtil.driverClassName());
 long start = System.currentTimeMillis();

  // 读取person表中所有数据
  Dataset data = sqlContext.read().jdbc(DataSourceUtil.url(), "person", connectionProperties).select("*");
 long end = System.currentTimeMillis();
  System.out.println("读取数据库数据【"+data.count()+"】条,耗时:"+((end-start)/1000));
  // 过滤出性别为男的数据
  Dataset maleData = data.filter(new FilterFunction() {
            private static final long serialVersionUID = -6182357065815734414L;

  @Override
  public boolean call(Row value) {
                String sex = value.getAs("sex");
 return sex.equals("男");
  }
        });
// 得到性别为男的身高数据
  Dataset maleHeightData = maleData.map(new MapFunction, Integer>() {
            private static final long serialVersionUID = -7881663810003682651L;

  @Override
  public Integer call(Row value) {
                return value.getAs("height");
  }
        }, Encoders.INT());

  // 全部男性身高相加
  Integer maleReduce = maleHeightData.reduce(new ReduceFunction() {
            private static final long serialVersionUID = -7419948477276929434L;

  @Override
  public Integer call(Integer v1, Integer v2) {
                return v1 + v2;
  }
        });

  Dataset maleHeight = maleData.sort(maleData.col("height").desc());//男性身高倒序排序
  Dataset lowerMaleHeight = maleData.sort(maleData.col("height").asc());//男性身高升序排序
  System.out.println("男性平均身高:"+(maleReduce/maleHeightData.count())+",最高的男性身高为:" + maleHeight.first() + ",最矮:" + lowerMaleHeight.first());

// 过滤出性别为女的数据
  Dataset feMaleData = data.filter(new FilterFunction() {
            private static final long serialVersionUID = 6593222075687505570L;

  @Override
  public boolean call(Row value) {
                String sex = value.getAs("sex");
 return sex.equals("女");
  }
        });
// 得到性别为女的身高数据
  Dataset femaleHeightData = feMaleData.map(new MapFunction, Integer>() {
            private static final long serialVersionUID = -7881663810003682651L;

  @Override
  public Integer call(Row value) {
                return value.getAs("height");
  }
        }, Encoders.INT());

  // 全部女性身高相加
  Integer femaleReduce = femaleHeightData.reduce(new ReduceFunction() {
            private static final long serialVersionUID = -7419948477276929434L;

  @Override
  public Integer call(Integer v1, Integer v2) {
                return v1 + v2;
  }
        });

  Dataset femaleHeight = feMaleData.sort(feMaleData.col("height").desc());//女性身高倒序排序
  Dataset lowerFemaleHeight = feMaleData.sort(feMaleData.col("height").asc());//女性身高升序排序
  System.out.println("女性平均身高:"+(femaleReduce/femaleHeightData.count())+",最高的女性身高为:" + femaleHeight.first() + ",最矮:" + lowerFemaleHeight.first());

  System.out.println("计算耗时:"+((System.currentTimeMillis()-end)/1000));

  }
}

如果本地有安装 Spark 服务,可以直接右键 Run 这个 main 函数即可计算出结果。

下面介绍一下提交任务到 Spark 集群中运行。

其实提交任务到 Spark 集群也很简单,先 maven 打包出 jar,把 jar 包上传到 Spark 的 Master 节点的任意目录,执行命令:

spark-submit --master spark://spark1:7077 --class xxx.yyy.MySQLService /usr/local/apps/test-1.0.jar

--master spark://spark1:7077
  这个是指定master节点的地址
--class xxx.yyy.MySQLService
  这个是指定执行那个类的main函数
/usr/local/apps/test-1.0.jar
  这个是指定jar包的路径

这样就可以提交任务到 Spark 集群里了。






扫一扫有惊喜: [![imagepng](http://itechor.top/solo/upload/bb791a58c3a84193b7f643b6849482c5_image.png) ](http://ym0214.com)
  • Spark

    Spark 是 UC Berkeley AMP lab 所开源的类 Hadoop MapReduce 的通用并行框架。Spark 拥有 Hadoop MapReduce 所具有的优点;但不同于 MapReduce 的是 Job 中间输出结果可以保存在内存中,从而不再需要读写 HDFS,因此 Spark 能更好地适用于数据挖掘与机器学习等需要迭代的 MapReduce 的算法。

    74 引用 • 46 回帖 • 548 关注
  • 大数据

    大数据(big data)是指无法在一定时间范围内用常规软件工具进行捕捉、管理和处理的数据集合,是需要新处理模式才能具有更强的决策力、洞察发现力和流程优化能力的海量、高增长率和多样化的信息资产。

    89 引用 • 113 回帖

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...

推荐标签 标签

  • 创业

    你比 99% 的人都优秀么?

    82 引用 • 1398 回帖
  • GitBook

    GitBook 使您的团队可以轻松编写和维护高质量的文档。 分享知识,提高团队的工作效率,让用户满意。

    3 引用 • 8 回帖
  • 负能量

    上帝为你关上了一扇门,然后就去睡觉了....努力不一定能成功,但不努力一定很轻松 (° ー °〃)

    85 引用 • 1201 回帖 • 449 关注
  • Webswing

    Webswing 是一个能将任何 Swing 应用通过纯 HTML5 运行在浏览器中的 Web 服务器,详细介绍请看 将 Java Swing 应用变成 Web 应用

    1 引用 • 15 回帖 • 636 关注
  • Markdown

    Markdown 是一种轻量级标记语言,用户可使用纯文本编辑器来排版文档,最终通过 Markdown 引擎将文档转换为所需格式(比如 HTML、PDF 等)。

    163 引用 • 1450 回帖
  • 微信

    腾讯公司 2011 年 1 月 21 日推出的一款手机通讯软件。用户可以通过摇一摇、搜索号码、扫描二维码等添加好友和关注公众平台,同时可以将自己看到的精彩内容分享到微信朋友圈。

    129 引用 • 793 回帖
  • Latke

    Latke 是一款以 JSON 为主的 Java Web 框架。

    70 引用 • 532 回帖 • 712 关注
  • jsoup

    jsoup 是一款 Java 的 HTML 解析器,可直接解析某个 URL 地址、HTML 文本内容。它提供了一套非常省力的 API,可通过 DOM,CSS 以及类似于 jQuery 的操作方法来取出和操作数据。

    6 引用 • 1 回帖 • 462 关注
  • 黑曜石

    黑曜石是一款强大的知识库工具,支持本地 Markdown 文件编辑,支持双向链接和关系图。

    A second brain, for you, forever.

    10 引用 • 85 回帖
  • SMTP

    SMTP(Simple Mail Transfer Protocol)即简单邮件传输协议,它是一组用于由源地址到目的地址传送邮件的规则,由它来控制信件的中转方式。SMTP 协议属于 TCP/IP 协议簇,它帮助每台计算机在发送或中转信件时找到下一个目的地。

    4 引用 • 18 回帖 • 588 关注
  • 酷鸟浏览器

    安全 · 稳定 · 快速
    为跨境从业人员提供专业的跨境浏览器

    3 引用 • 59 回帖 • 25 关注
  • Netty

    Netty 是一个基于 NIO 的客户端-服务器编程框架,使用 Netty 可以让你快速、简单地开发出一个可维护、高性能的网络应用,例如实现了某种协议的客户、服务端应用。

    49 引用 • 33 回帖 • 21 关注
  • 周末

    星期六到星期天晚,实行五天工作制后,指每周的最后两天。再过几年可能就是三天了。

    14 引用 • 297 回帖
  • 大数据

    大数据(big data)是指无法在一定时间范围内用常规软件工具进行捕捉、管理和处理的数据集合,是需要新处理模式才能具有更强的决策力、洞察发现力和流程优化能力的海量、高增长率和多样化的信息资产。

    89 引用 • 113 回帖
  • Ngui

    Ngui 是一个 GUI 的排版显示引擎和跨平台的 GUI 应用程序开发框架,基于
    Node.js / OpenGL。目标是在此基础上开发 GUI 应用程序可拥有开发 WEB 应用般简单与速度同时兼顾 Native 应用程序的性能与体验。

    7 引用 • 9 回帖 • 345 关注
  • 支付宝

    支付宝是全球领先的独立第三方支付平台,致力于为广大用户提供安全快速的电子支付/网上支付/安全支付/手机支付体验,及转账收款/水电煤缴费/信用卡还款/AA 收款等生活服务应用。

    29 引用 • 347 回帖
  • LaTeX

    LaTeX(音译“拉泰赫”)是一种基于 ΤΕΧ 的排版系统,由美国计算机学家莱斯利·兰伯特(Leslie Lamport)在 20 世纪 80 年代初期开发,利用这种格式,即使使用者没有排版和程序设计的知识也可以充分发挥由 TeX 所提供的强大功能,能在几天,甚至几小时内生成很多具有书籍质量的印刷品。对于生成复杂表格和数学公式,这一点表现得尤为突出。因此它非常适用于生成高印刷质量的科技和数学类文档。

    9 引用 • 32 回帖 • 166 关注
  • 旅游

    希望你我能在旅途中找到人生的下一站。

    85 引用 • 895 回帖 • 1 关注
  • JetBrains

    JetBrains 是一家捷克的软件开发公司,该公司位于捷克的布拉格,并在俄国的圣彼得堡及美国麻州波士顿都设有办公室,该公司最为人所熟知的产品是 Java 编程语言开发撰写时所用的集成开发环境:IntelliJ IDEA

    18 引用 • 54 回帖 • 1 关注
  • Q&A

    提问之前请先看《提问的智慧》,好的问题比好的答案更有价值。

    6551 引用 • 29424 回帖 • 245 关注
  • 宕机

    宕机,多指一些网站、游戏、网络应用等服务器一种区别于正常运行的状态,也叫“Down 机”、“当机”或“死机”。宕机状态不仅仅是指服务器“挂掉了”、“死机了”状态,也包括服务器假死、停用、关闭等一些原因而导致出现的不能够正常运行的状态。

    13 引用 • 82 回帖 • 38 关注
  • OkHttp

    OkHttp 是一款 HTTP & HTTP/2 客户端库,专为 Android 和 Java 应用打造。

    16 引用 • 6 回帖 • 53 关注
  • Sym

    Sym 是一款用 Java 实现的现代化社区(论坛/BBS/社交网络/博客)系统平台。

    下一代的社区系统,为未来而构建

    523 引用 • 4581 回帖 • 690 关注
  • SpaceVim

    SpaceVim 是一个社区驱动的模块化 vim/neovim 配置集合,以模块的方式组织管理插件以
    及相关配置,为不同的语言开发量身定制了相关的开发模块,该模块提供代码自动补全,
    语法检查、格式化、调试、REPL 等特性。用户仅需载入相关语言的模块即可得到一个开箱
    即用的 Vim-IDE。

    3 引用 • 31 回帖 • 71 关注
  • 单点登录

    单点登录(Single Sign On)是目前比较流行的企业业务整合的解决方案之一。SSO 的定义是在多个应用系统中,用户只需要登录一次就可以访问所有相互信任的应用系统。

    9 引用 • 25 回帖 • 3 关注
  • SQLite

    SQLite 是一个进程内的库,实现了自给自足的、无服务器的、零配置的、事务性的 SQL 数据库引擎。SQLite 是全世界使用最为广泛的数据库引擎。

    4 引用 • 7 回帖 • 4 关注
  • Maven

    Maven 是基于项目对象模型(POM)、通过一小段描述信息来管理项目的构建、报告和文档的软件项目管理工具。

    185 引用 • 318 回帖 • 346 关注