导读:从 0 开始搭建 spark 环境,了解 spark 里的 RDD 和 DataFrame,并用 python 编写 spark 程序,实现分布式数据读取,ML 训练。
一、从 0 搭建 spark
1、从官网下载 spark 包,并解压到自定义目录,配置环境变量。
2、mater 启动:进入到 spark 解压目录,输入命令:./sbin/start-master.sh -h master_ip
3、slave 启动:./sbin/start-slave.sh
二、RDD 和 DataFrame
了解 RDD 和 DataFrame 关系时,不放从 Spark 的两个机器学习的库 ML 和 MLlib 来接入原文链接
- 目前常用的机器学习功能 2 个库都能满足需求;
- spark 官方推荐使用 ML, 因为在 spark3.0 之后,将会废弃 MLlib,全面的基于 ML。因为 ml 操作的对象是 DataFrame,操作起来会比 RDD 方便很多。所以,建议新接触 spark 的同学可以直接用 ml 的方式;
- ML 主要操作的是 DataFrame, 而 MLlib 操作的是 RDD,也就是说二者面向的数据集不一样。相比于 MLlib 在 RDD 提供的基础操作,ML 在 DataFrame 上的抽象级别更高,数据和操作耦合度更低;
- DataFrame 和 RDD 什么关系?DataFrame 是 Dataset 的子集,也就是 Dataset[Row], 而 DataSet 是对 RDD 的封装,对 SQL 之类的操作做了很多优化;
- 相比于 MLlib 在 RDD 提供的基础操作,ML 在 DataFrame 上的抽象级别更高,数据和操作耦合度更低;
- ML 中的操作可以使用 pipeline, 跟 sklearn 一样,可以把很多操作(算法/特征提取/特征转换)以管道的形式串起来,然后让数据在这个管道中流动。大家可以脑补一下 Linux 管道在做任务组合时有多么方便;
- ML 中无论是什么模型,都提供了统一的算法操作接口,比如模型训练都是 fit;不像 MLlib 中不同模型会有各种各样的 train;
- MLlib 在 spark2.0 之后进入维护状态, 这个状态通常只修复 BUG 不增加新功能;
- ML 中的随机森林支持更多的功能:包括重要度、预测概率输出等,而 MLlib 不支持。
三、python 操作 spark
1、 什么是 PySpark
Apache Spark 是用 Scala 编程语言编写的。为了用 Spark 支持 Python,Apache Spark 社区发布了一个工具 PySpark。使用 PySpark,您也可以使用 Python 编程语言处理 RDD。正是由于一个名为 Py4j 的库,他们才能实现这一目标。
2 、PySpark 实现随机森林
# -*- coding:utf-8 -*-
from pyspark import SparkConf
from pyspark.sql import SparkSession
import os
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
os.environ["PYSPARK_PYTHON"]="/usr/bin/python3"
# 设置master地址,worker使用的内存及cpu
appname = "test1"
master = "spark://192.168.255.168:7077"
conf = SparkConf().setAppName(appname).setMaster(master) # spark资源配置
conf.set('spark.driver.maxResultSize', '182g')
conf.set('spark.executor.memory', '60g')
conf.set('spark.cores.max', 30)
conf.set("spark.executor.cores", '8')
#SparkSession为用户提供了统一的切入点,来让用户学习spark的各项功能。
spark = SparkSession.builder.config(conf=conf).getOrCreate()
#SparkContext是spark的主要切入点,由于RDD是主要的API,我们通过sparkcontext来创建和操作RDD。
sc = spark.sparkContext
# 对于每个其他的API,我们需要使用不同的context,例如,例如:Streming
# ssc = StreamingContext(sc, 1)
# 将csv文件读入rdd,并转成spark中的dataframe
file = "/Ipynb/Gum/loop/demo20191204_1.csv"
lines = sc.textFile(file)
df0=spark.read.csv(lines, header=True, encoding="utf-8",maxColumns=50000,inferSchema=True)
print("读取完成")
# 处理特征,第一列为target,第二列及后面数据为feature
old_columns_names = df0.columns
df0 = df0.withColumnRenamed(old_columns_names[0], 'label')
vecAss = VectorAssembler(inputCols=old_columns_names[1:],handleInvalid="keep", outputCol='features')
df0 = vecAss.transform(df0)
dfi = df0.select(['label', 'features'])
print(dfi.show(3))
# 进行随机森林训练
blor = RandomForestClassifier(numTrees=3, maxDepth=2, featuresCol="features", labelCol="label", seed=42)
blorModel = blor.fit(dfi)
print("训练完成")
result = blorModel.transform(dfi)
result.filter(result.label == result.prediction).count()/result.count()
sc.stop()
print(result.show(3))
3、ui 界面
输入以下命令,开始运行:
spark-submit demo.py
查看 ui 界面,可以发现两个 worker,同时运行,cpu 核和内存是对应的,在代码中可以配置。
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于