python 从 0 编写 spark 程序

本贴最后更新于 1496 天前,其中的信息可能已经时移俗易

导读:从 0 开始搭建 spark 环境,了解 spark 里的 RDD 和 DataFrame,并用 python 编写 spark 程序,实现分布式数据读取,ML 训练。

一、从 0 搭建 spark

1、从官网下载 spark 包,并解压到自定义目录,配置环境变量。
2、mater 启动:进入到 spark 解压目录,输入命令:./sbin/start-master.sh -h master_ip
3、slave 启动:./sbin/start-slave.sh

二、RDD 和 DataFrame

了解 RDD 和 DataFrame 关系时,不放从 Spark 的两个机器学习的库 ML 和 MLlib 来接入原文链接

  1. 目前常用的机器学习功能 2 个库都能满足需求;
  2. spark 官方推荐使用 ML, 因为在 spark3.0 之后,将会废弃 MLlib,全面的基于 ML。因为 ml 操作的对象是 DataFrame,操作起来会比 RDD 方便很多。所以,建议新接触 spark 的同学可以直接用 ml 的方式;
  3. ML 主要操作的是 DataFrame, 而 MLlib 操作的是 RDD,也就是说二者面向的数据集不一样。相比于 MLlib 在 RDD 提供的基础操作,ML 在 DataFrame 上的抽象级别更高,数据和操作耦合度更低;
  4. DataFrame 和 RDD 什么关系?DataFrame 是 Dataset 的子集,也就是 Dataset[Row], 而 DataSet 是对 RDD 的封装,对 SQL 之类的操作做了很多优化;
  5. 相比于 MLlib 在 RDD 提供的基础操作,ML 在 DataFrame 上的抽象级别更高,数据和操作耦合度更低;
  6. ML 中的操作可以使用 pipeline, 跟 sklearn 一样,可以把很多操作(算法/特征提取/特征转换)以管道的形式串起来,然后让数据在这个管道中流动。大家可以脑补一下 Linux 管道在做任务组合时有多么方便;
  7. ML 中无论是什么模型,都提供了统一的算法操作接口,比如模型训练都是 fit;不像 MLlib 中不同模型会有各种各样的 train;
  8. MLlib 在 spark2.0 之后进入维护状态, 这个状态通常只修复 BUG 不增加新功能;
  9. ML 中的随机森林支持更多的功能:包括重要度、预测概率输出等,而 MLlib 不支持。

三、python 操作 spark

1、 什么是 PySpark

Apache Spark 是用 Scala 编程语言编写的。为了用 Spark 支持 Python,Apache Spark 社区发布了一个工具 PySpark。使用 PySpark,您也可以使用 Python 编程语言处理 RDD。正是由于一个名为 Py4j 的库,他们才能实现这一目标。

2 、PySpark 实现随机森林

# -*- coding:utf-8 -*-


from pyspark import SparkConf
from pyspark.sql import SparkSession
import os
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier

os.environ["PYSPARK_PYTHON"]="/usr/bin/python3"


# 设置master地址,worker使用的内存及cpu
appname = "test1"
master = "spark://192.168.255.168:7077"
conf = SparkConf().setAppName(appname).setMaster(master)  # spark资源配置
conf.set('spark.driver.maxResultSize', '182g')
conf.set('spark.executor.memory', '60g')
conf.set('spark.cores.max', 30)
conf.set("spark.executor.cores", '8')

#SparkSession为用户提供了统一的切入点,来让用户学习spark的各项功能。
spark = SparkSession.builder.config(conf=conf).getOrCreate()

#SparkContext是spark的主要切入点,由于RDD是主要的API,我们通过sparkcontext来创建和操作RDD。
sc = spark.sparkContext

# 对于每个其他的API,我们需要使用不同的context,例如,例如:Streming
# ssc = StreamingContext(sc, 1)


# 将csv文件读入rdd,并转成spark中的dataframe
file = "/Ipynb/Gum/loop/demo20191204_1.csv"
lines = sc.textFile(file)
df0=spark.read.csv(lines, header=True, encoding="utf-8",maxColumns=50000,inferSchema=True)
print("读取完成")

# 处理特征,第一列为target,第二列及后面数据为feature
old_columns_names = df0.columns
df0 = df0.withColumnRenamed(old_columns_names[0], 'label')
vecAss = VectorAssembler(inputCols=old_columns_names[1:],handleInvalid="keep", outputCol='features')
df0 = vecAss.transform(df0)
dfi = df0.select(['label', 'features'])
print(dfi.show(3))

# 进行随机森林训练
blor = RandomForestClassifier(numTrees=3, maxDepth=2, featuresCol="features", labelCol="label", seed=42)
blorModel = blor.fit(dfi)
print("训练完成")
result = blorModel.transform(dfi)
result.filter(result.label == result.prediction).count()/result.count()
sc.stop()
print(result.show(3))

3、ui 界面

输入以下命令,开始运行:

spark-submit demo.py

查看 ui 界面,可以发现两个 worker,同时运行,cpu 核和内存是对应的,在代码中可以配置。
截屏 20191209 下午 12.25.09.png

  • Spark

    Spark 是 UC Berkeley AMP lab 所开源的类 Hadoop MapReduce 的通用并行框架。Spark 拥有 Hadoop MapReduce 所具有的优点;但不同于 MapReduce 的是 Job 中间输出结果可以保存在内存中,从而不再需要读写 HDFS,因此 Spark 能更好地适用于数据挖掘与机器学习等需要迭代的 MapReduce 的算法。

    74 引用 • 46 回帖 • 549 关注
  • Python

    Python 是一种面向对象、直译式电脑编程语言,具有近二十年的发展历史,成熟且稳定。它包含了一组完善而且容易理解的标准库,能够轻松完成很多常见的任务。它的语法简捷和清晰,尽量使用无异义的英语单词,与其它大多数程序设计语言使用大括号不一样,它使用缩进来定义语句块。

    535 引用 • 672 回帖

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...