Spark 学习之提交任务(六)

本贴最后更新于 2183 天前,其中的信息可能已经天翻地覆

本篇文章主要记录 Spark 的任务提交到集群上的过程

http://itechor.top/solo/articles/2018/12/17/1545016407680.html 这篇文章搭建好的集群环境上,进行任务的提交运行。

新建一个 maven 项目,以统计用户身高性别等为主,pom.xml 添加以下依赖:

<properties>
  <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
  <java.version>1.8></java.version>
  <spark.version>2.4.0></spark.version>
</properties>

<dependencies>
  <dependency>
	<groupId>com.thoughtworks.paranamer</groupId>
	<artifactId>paranamer</artifactId>
	<version>2.8</version>
  </dependency>
  <dependency>
	<groupId>org.apache.spark</groupId>
	<artifactId>spark-core_2.12</artifactId>
	<version>${spark.version}</version>
  </dependency>
  <dependency>
	<groupId>org.apache.spark</groupId>
	<artifactId>spark-sql_2.12</artifactId>
	<version>${spark.version}</version>
  </dependency>
  <dependency>
	<groupId>mysql</groupId>
	<artifactId>mysql-connector-java</artifactId>
	<version>8.0.13</version>
  </dependency>
</dependencies>

<build>
  <plugins>
	<plugin>
	  <groupId>org.apache.maven.plugins</groupId>
	  <artifactId>maven-assembly-plugin</artifactId>
	  <version>3.1.0</version>
	  <configuration>
		<descriptorRefs>
		  <descriptorRef>jar-with-dependencies</descriptorRef>
		</descriptorRefs>
		<archive>
		  <manifest>
			<mainClass>xxx.yyy</mainClass>
		  </manifest>
		</archive>
	  </configuration>
	  <executions>
		<execution>
		  <id>make-assembly</id>
		  <phase>package</phase>
		  <goals>
			<goal>single</goal>
		  </goals>
		</execution>
	  </executions>
	</plugin>
	<plugin>
	  <groupId>org.apache.maven.plugins</groupId>
	  <artifactId>maven-compiler-plugin</artifactId>
	  <version>3.8.0</version>
	  <configuration>
		<source>1.8</source>
		<target>1.8</target>
	  </configuration>
	</plugin>
  </plugins>
</build>

配置数据库的配置信息,application.xml:

mysql.datasource.url=jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf8&autoReconnect=true&useSSL=false
mysql.datasource.username=root
mysql.datasource.password=root
mysql.datasource.driverClassName=com.mysql.cj.jdbc.Driver

读取数据库配置文件 DataSourceUtil.java:

public class DataSourceUtil {
    public static String url(){
        return PropertyUtil.getInstance().getString("mysql.datasource.url");
  }
    public static String userName(){
        return PropertyUtil.getInstance().getString("mysql.datasource.username");
  }
    public static String passWord(){
        return PropertyUtil.getInstance().getString("mysql.datasource.password");
  }
    public static String driverClassName(){
        return PropertyUtil.getInstance().getString("mysql.datasource.driverClassName");
  }
}

PropertyUtil.java:

public class PropertyUtil {
    private PropertyUtil() {
    }
    private static class SingleTonHoler {
        private static ResourceBundle INSTANCE = ResourceBundle.getBundle("application");
  }
    public static ResourceBundle getInstance() {
        return SingleTonHoler.INSTANCE;
  }
}

MySQLService.java:

import cn.grgpay.analyze.util.DataSourceUtil;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.*;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.*;

import java.io.Serializable;
import java.util.*;

public class MySQLService implements Serializable {
    private static final long serialVersionUID = 396720719322480114L;

    public static void main(String[] args) {
        readMySQL();

  }

    private static void readMySQL() {
        SparkSession session = SparkSession.builder().master("local[*]").appName("readMySQLToDay").config("spark.sql.warehouse.dir", "./spark-warehouse").getOrCreate();
  SQLContext sqlContext = session.sqlContext();

  Properties connectionProperties = new Properties();
  connectionProperties.put("user", DataSourceUtil.userName());
  connectionProperties.put("password", DataSourceUtil.passWord());
  connectionProperties.put("driver", DataSourceUtil.driverClassName());
 long start = System.currentTimeMillis();

  // 读取person表中所有数据
  Dataset data = sqlContext.read().jdbc(DataSourceUtil.url(), "person", connectionProperties).select("*");
 long end = System.currentTimeMillis();
  System.out.println("读取数据库数据【"+data.count()+"】条,耗时:"+((end-start)/1000));
  // 过滤出性别为男的数据
  Dataset maleData = data.filter(new FilterFunction() {
            private static final long serialVersionUID = -6182357065815734414L;

  @Override
  public boolean call(Row value) {
                String sex = value.getAs("sex");
 return sex.equals("男");
  }
        });
// 得到性别为男的身高数据
  Dataset maleHeightData = maleData.map(new MapFunction, Integer>() {
            private static final long serialVersionUID = -7881663810003682651L;

  @Override
  public Integer call(Row value) {
                return value.getAs("height");
  }
        }, Encoders.INT());

  // 全部男性身高相加
  Integer maleReduce = maleHeightData.reduce(new ReduceFunction() {
            private static final long serialVersionUID = -7419948477276929434L;

  @Override
  public Integer call(Integer v1, Integer v2) {
                return v1 + v2;
  }
        });

  Dataset maleHeight = maleData.sort(maleData.col("height").desc());//男性身高倒序排序
  Dataset lowerMaleHeight = maleData.sort(maleData.col("height").asc());//男性身高升序排序
  System.out.println("男性平均身高:"+(maleReduce/maleHeightData.count())+",最高的男性身高为:" + maleHeight.first() + ",最矮:" + lowerMaleHeight.first());

// 过滤出性别为女的数据
  Dataset feMaleData = data.filter(new FilterFunction() {
            private static final long serialVersionUID = 6593222075687505570L;

  @Override
  public boolean call(Row value) {
                String sex = value.getAs("sex");
 return sex.equals("女");
  }
        });
// 得到性别为女的身高数据
  Dataset femaleHeightData = feMaleData.map(new MapFunction, Integer>() {
            private static final long serialVersionUID = -7881663810003682651L;

  @Override
  public Integer call(Row value) {
                return value.getAs("height");
  }
        }, Encoders.INT());

  // 全部女性身高相加
  Integer femaleReduce = femaleHeightData.reduce(new ReduceFunction() {
            private static final long serialVersionUID = -7419948477276929434L;

  @Override
  public Integer call(Integer v1, Integer v2) {
                return v1 + v2;
  }
        });

  Dataset femaleHeight = feMaleData.sort(feMaleData.col("height").desc());//女性身高倒序排序
  Dataset lowerFemaleHeight = feMaleData.sort(feMaleData.col("height").asc());//女性身高升序排序
  System.out.println("女性平均身高:"+(femaleReduce/femaleHeightData.count())+",最高的女性身高为:" + femaleHeight.first() + ",最矮:" + lowerFemaleHeight.first());

  System.out.println("计算耗时:"+((System.currentTimeMillis()-end)/1000));

  }
}

如果本地有安装 Spark 服务,可以直接右键 Run 这个 main 函数即可计算出结果。

下面介绍一下提交任务到 Spark 集群中运行。

其实提交任务到 Spark 集群也很简单,先 maven 打包出 jar,把 jar 包上传到 Spark 的 Master 节点的任意目录,执行命令:

spark-submit --master spark://spark1:7077 --class xxx.yyy.MySQLService /usr/local/apps/test-1.0.jar

--master spark://spark1:7077
  这个是指定master节点的地址
--class xxx.yyy.MySQLService
  这个是指定执行那个类的main函数
/usr/local/apps/test-1.0.jar
  这个是指定jar包的路径

这样就可以提交任务到 Spark 集群里了。






扫一扫有惊喜: [![imagepng](http://itechor.top/solo/upload/bb791a58c3a84193b7f643b6849482c5_image.png) ](http://ym0214.com)
  • Spark

    Spark 是 UC Berkeley AMP lab 所开源的类 Hadoop MapReduce 的通用并行框架。Spark 拥有 Hadoop MapReduce 所具有的优点;但不同于 MapReduce 的是 Job 中间输出结果可以保存在内存中,从而不再需要读写 HDFS,因此 Spark 能更好地适用于数据挖掘与机器学习等需要迭代的 MapReduce 的算法。

    74 引用 • 46 回帖 • 560 关注
  • 大数据

    大数据(big data)是指无法在一定时间范围内用常规软件工具进行捕捉、管理和处理的数据集合,是需要新处理模式才能具有更强的决策力、洞察发现力和流程优化能力的海量、高增长率和多样化的信息资产。

    93 引用 • 113 回帖

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...

推荐标签 标签

  • Q&A

    提问之前请先看《提问的智慧》,好的问题比好的答案更有价值。

    8447 引用 • 38484 回帖 • 155 关注
  • LaTeX

    LaTeX(音译“拉泰赫”)是一种基于 ΤΕΧ 的排版系统,由美国计算机学家莱斯利·兰伯特(Leslie Lamport)在 20 世纪 80 年代初期开发,利用这种格式,即使使用者没有排版和程序设计的知识也可以充分发挥由 TeX 所提供的强大功能,能在几天,甚至几小时内生成很多具有书籍质量的印刷品。对于生成复杂表格和数学公式,这一点表现得尤为突出。因此它非常适用于生成高印刷质量的科技和数学类文档。

    12 引用 • 54 回帖 • 49 关注
  • TGIF

    Thank God It's Friday! 感谢老天,总算到星期五啦!

    288 引用 • 4485 回帖 • 663 关注
  • 又拍云

    又拍云是国内领先的 CDN 服务提供商,国家工信部认证通过的“可信云”,乌云众测平台认证的“安全云”,为移动时代的创业者提供新一代的 CDN 加速服务。

    21 引用 • 37 回帖 • 548 关注
  • CloudFoundry

    Cloud Foundry 是 VMware 推出的业界第一个开源 PaaS 云平台,它支持多种框架、语言、运行时环境、云平台及应用服务,使开发人员能够在几秒钟内进行应用程序的部署和扩展,无需担心任何基础架构的问题。

    5 引用 • 18 回帖 • 172 关注
  • JRebel

    JRebel 是一款 Java 虚拟机插件,它使得 Java 程序员能在不进行重部署的情况下,即时看到代码的改变对一个应用程序带来的影响。

    26 引用 • 78 回帖 • 672 关注
  • CongSec

    本标签主要用于分享网络空间安全专业的学习笔记

    1 引用 • 1 回帖 • 15 关注
  • 学习

    “梦想从学习开始,事业从实践起步” —— 习近平

    171 引用 • 512 回帖
  • TensorFlow

    TensorFlow 是一个采用数据流图(data flow graphs),用于数值计算的开源软件库。节点(Nodes)在图中表示数学操作,图中的线(edges)则表示在节点间相互联系的多维数据数组,即张量(tensor)。

    20 引用 • 19 回帖 • 1 关注
  • Logseq

    Logseq 是一个隐私优先、开源的知识库工具。

    Logseq is a joyful, open-source outliner that works on top of local plain-text Markdown and Org-mode files. Use it to write, organize and share your thoughts, keep your to-do list, and build your own digital garden.

    6 引用 • 63 回帖 • 5 关注
  • 分享

    有什么新发现就分享给大家吧!

    248 引用 • 1795 回帖 • 1 关注
  • 运维

    互联网运维工作,以服务为中心,以稳定、安全、高效为三个基本点,确保公司的互联网业务能够 7×24 小时为用户提供高质量的服务。

    149 引用 • 257 回帖
  • WiFiDog

    WiFiDog 是一套开源的无线热点认证管理工具,主要功能包括:位置相关的内容递送;用户认证和授权;集中式网络监控。

    1 引用 • 7 回帖 • 592 关注
  • WebSocket

    WebSocket 是 HTML5 中定义的一种新协议,它实现了浏览器与服务器之间的全双工通信(full-duplex)。

    48 引用 • 206 回帖 • 318 关注
  • 工具

    子曰:“工欲善其事,必先利其器。”

    288 引用 • 734 回帖 • 2 关注
  • 程序员

    程序员是从事程序开发、程序维护的专业人员。

    574 引用 • 3533 回帖
  • Linux

    Linux 是一套免费使用和自由传播的类 Unix 操作系统,是一个基于 POSIX 和 Unix 的多用户、多任务、支持多线程和多 CPU 的操作系统。它能运行主要的 Unix 工具软件、应用程序和网络协议,并支持 32 位和 64 位硬件。Linux 继承了 Unix 以网络为核心的设计思想,是一个性能稳定的多用户网络操作系统。

    946 引用 • 943 回帖
  • SpaceVim

    SpaceVim 是一个社区驱动的模块化 vim/neovim 配置集合,以模块的方式组织管理插件以
    及相关配置,为不同的语言开发量身定制了相关的开发模块,该模块提供代码自动补全,
    语法检查、格式化、调试、REPL 等特性。用户仅需载入相关语言的模块即可得到一个开箱
    即用的 Vim-IDE。

    3 引用 • 31 回帖 • 104 关注
  • 数据库

    据说 99% 的性能瓶颈都在数据库。

    343 引用 • 723 回帖
  • Postman

    Postman 是一款简单好用的 HTTP API 调试工具。

    4 引用 • 3 回帖 • 7 关注
  • 链滴

    链滴是一个记录生活的地方。

    记录生活,连接点滴

    156 引用 • 3792 回帖
  • Webswing

    Webswing 是一个能将任何 Swing 应用通过纯 HTML5 运行在浏览器中的 Web 服务器,详细介绍请看 将 Java Swing 应用变成 Web 应用

    1 引用 • 15 回帖 • 637 关注
  • HHKB

    HHKB 是富士通的 Happy Hacking 系列电容键盘。电容键盘即无接点静电电容式键盘(Capacitive Keyboard)。

    5 引用 • 74 回帖 • 478 关注
  • jQuery

    jQuery 是一套跨浏览器的 JavaScript 库,强化 HTML 与 JavaScript 之间的操作。由 John Resig 在 2006 年 1 月的 BarCamp NYC 上释出第一个版本。全球约有 28% 的网站使用 jQuery,是非常受欢迎的 JavaScript 库。

    63 引用 • 134 回帖 • 724 关注
  • 快应用

    快应用 是基于手机硬件平台的新型应用形态;标准是由主流手机厂商组成的快应用联盟联合制定;快应用标准的诞生将在研发接口、能力接入、开发者服务等层面建设标准平台;以平台化的生态模式对个人开发者和企业开发者全品类开放。

    15 引用 • 127 回帖
  • Solidity

    Solidity 是一种智能合约高级语言,运行在 [以太坊] 虚拟机(EVM)之上。它的语法接近于 JavaScript,是一种面向对象的语言。

    3 引用 • 18 回帖 • 401 关注
  • 域名

    域名(Domain Name),简称域名、网域,是由一串用点分隔的名字组成的 Internet 上某一台计算机或计算机组的名称,用于在数据传输时标识计算机的电子方位(有时也指地理位置)。

    43 引用 • 208 回帖