炼数成金 门户 大数据 Spark 查看内容

一篇文章搞懂 PySpark MLlib

2018-9-4 15:30| 发布者: 炼数成金_小数| 查看: 29018| 评论: 0|原作者: 祝威廉|来自: Spark技术日报

摘要: 我们的目标,是要设计这么一个系统,当把一张拥有很多字段的表给系统,系统自动抽取出特征,这些特征可以给机器学习算法如SVM,贝叶斯之类的,也可以适配深度学习的要求。初看起来显得野心太大,但也不是不可能,我们 ...

Python 模型 Hadoop Spark MLlib

001、PySpark MLlib 基础
PySpark MLlib 单独成篇是因为,MLlib 提供了大量和特征工程相关的工具,譬如缺失值处理,字符转化为数字,词向量工具等,这些可以都是使用Scala语言分布式实现,并且可以使用Python来调用。在这个章节,我们会用一个实际项目来练手,可能有点难度,但是如果细加揣摩的话,会有巨大的帮助。

在进行代码练习之前,我们需要做一些准备工作。

//下载项目git clone https://github.com/allwefantasy/spark-deep-learning.git .//切换到release 分支git checkout release//进入spark-deep-learning目录运行如下命令build/sbt assembly
如果没有大问题,应该就有了下面的包了:

target/scala-2.11/spark-deep-learning-assembly-0.2.0-spark2.1.jar
如果这一步骤编译太久或者你觉得太麻烦了,也是可以跳过的,可以感受下Spark MLlib的用法。

我们的目标,是要设计这么一个系统,当把一张拥有很多字段的表给系统,系统自动抽取出特征,这些特征可以给机器学习算法如SVM,贝叶斯之类的,也可以适配深度学习的要求。初看起来显得野心太大,但也不是不可能,我们总是需要一些挑战的。

为了实现自动特征化,核心是四点:类型,规则,统计,先验。

类型,所谓类型指的是Spark DataFrame 的数据是强类型的,常见类型有String,Int, Double, Float, Array, VectorUDF等,他们其实可以给我们提供一定的信息,比如String一般而言有两种可能性:

需要分词的字段,一般而言会转化tf/idf 或者word sequence(LSTM/CNN)形式。 不需要分词的字段,一般其实就是分类字段。

Int 我们可以求一个distinct值,如果很少,很可能是一个分类字段,比如性别,年龄等。Double, Float等则可能是连续的,比如可能是金额等。

规则,字段的名字也能给我们一定的启发,通常如果类型是String,并且名字还是title,body,sentence,summary之类的,一般是需要分词的字段。

Int类型而且还是age,gender之类的名字,则必定是个分类字段。在类型的基础上,让我们更好的确认,该如何特征化某个字段。

统计,当规则无法给我们帮助时,我们仅仅知道某个字段是一个int,我们该怎么办,这个时候统计就起作用了,如果某个字段只有少数几个类型,比如性别 ,统计只有两种可能性,这么少的可能性,那我们就可以对待为分类属性,可以进行one-hot化了。

如果发现有几十万个种类,可能就是售价之类的,那么就自然当做连续值即可,并且我们可能需要做一些缺失值处理。

先验,当然,我们可以通过人工干预,比如明确告知系统哪些是需要分词的字段,哪些是字段需要离散化,这些作为系统的先验知识。系统自动识别这种规则,然后自动进行处理,你需要做的就是告知哪些字段要做什么处理。

因为本小节的目标是为了为了演示如何利用PySpark MLlib完成这个强大的任务,为了简化起见,在本文中,我们只做下面提及的规则集,更复杂的规则集,在我上面提到的GitHub项目中有。

对浮点类型做缺失值处理
String类型字段如果是分类字段,则转化为one-hot向量
需要分词的String类型字段,word sequence & word embedding形态。

其中第三条,可以保证我们处理完的数据直接丢给Tensorflow进行模型训练。具体细节我们会在后续的章节里面阐述。

对浮点类型做缺失值处理,这个比较简单,我们先说下大家需要导入哪些PySpark的包:

from pyspark.ml import Estimator, Transformerfrom pyspark.ml import Pipelinefrom pyspark.ml.feature import Imputer, Param, Params, TypeConverters, VectorAssembler, VectorIndexer, Tokenizer, \
    HashingTF, OneHotEncoder, QuantileDiscretizer, Normalizerfrom pyspark.ml.linalg import VectorUDT, Vectors, DenseVectorfrom pyspark.SQL.types import *import pyspark.sql.functions as fn
接着我想把一个DataFrame的某一列进行缺失值处理,依然采用之前的数据集,但是内容需要做些调整:

a b c,1.0a b,c,3.0d,4.0
df = spark.read.csv(    "YOUR-PATH/a.csv",
    encoding="utf-8",
    header=False,
    schema=StructType(
        [StructField("text", StringType()),
         StructField("index", FloatType())]))
imputer = Imputer(inputCols=["index"], outputCols=["index_wow"])
df = imputer.fit(df).transform(df)
df.show()
执行结果如下:


这里我们看到,核心只有两行代码,构建Imputer,调用对应的fit,transform方法形成一个新的表(df)就可以了。

PySpark MLlib里,有几个概念,需要了解下:

Pipline 组合Estimator,Transformer完成一个完整的数据处理过程。
Estimator 你可以理解为算法的训练过程。
Transformer 模型进行预测
Parameter, Estimator ,Transformer共享的参数配置体系

这里比较核心的概念是Estimator,Transformer。Estimator核心方法是fit, Transformer的核心方法是transform。拿前面的Imputer为例,他首先有个训练的过程,需要拿到所有的index字段的值,做一些计算,得到一个模型,接着才能做出预测,填充缺失的值。所以Imputor是一个Estimator,同时也是一个Transformer。

当我们想把多个Imputor链接在一起,那么可以使用Pipline。

现在我们看看,如何把一个字符串转化为一个one-hot向量。为了完成这个工作,我们需要先把文本转化为依次递增的数字,然后再把数字转化为one-hot向量,为了方便,我使用了Pipline:

string_index = StringIndexer(inputCol="text",outputCol="text_number")
encoder = OneHotEncoder(inputCols=["text_number"], outputCols=["text_onehot"])
pipline = Pipeline(stages=[string_index,encoder])
pipline.fit(df).transform(df).show()
运行结果如下:


在上面的图片中你可能会感到好奇,为什么text_onehot列是类似这么一个东西:

(3,[1],[1.0])
实际的含义为:该向量长度为3,第一个位置为1。可以理解为向量稀疏表达的一种方式。

现在,让我们来一个复杂的,我希望把text字段分词(按空格),每个词用一个向量表示,组成一个二维矩阵,并且把得到行号,然后把文本字段里的词替换成行号。类似下面这样:

a: Vector1b: Vector2c: Vector3
a 在第0个位置,b在第一个位置,给定一行,比如:

a c b
最终被替换成

0 2 1
首先我们要训练词向量:

word2vec = Word2Vec(vectorSize=100, minCount=1,
                          inputCol="text",
                            outputCol="text_vector")
w2v_model = word2vec.fit(ds)
word_embedding = w2v_model.getVectors().rdd.map(
            lambda p: dict(word=p.word, vector=p.vector.values.tolist())).collect()
这一步我们把text字段喂给Word2Vec模型,然后获取得到词向量word_embedding。

word_embedding_with_index = [dict(word_index=(idx + 1), word=val["word"], vector=val["vector"]) for
                                          (idx, val)
                                          in
                                          enumerate(word_embedding)]

word_embedding_with_index.insert(0, dict(word_index=0, word="UNK",                                                    vector=np.zeros(100).tolist()))
对数组生成一个下标,从1开始,然后在数组前端插入一个记录,叫UNK,并且值是一个100维的0。这主要是我们后续在做NLP处理时解决词汇不在向量词典里的情况。

这样我们就得到了上面的关系:

词被表示成了向量
词都有一个的数字标识

从上面三个例子,我们可以看到PySpark MLlib是非常强大的,可以做非常多的特征工程相关的工作。

声明:文章收集于网络,如有侵权,请联系小编及时处理,谢谢!

欢迎加入本站公开兴趣群
软件开发技术群
兴趣范围包括:Java,C/C++,Python,PHP,Ruby,shell等各种语言开发经验交流,各种框架使用,外包项目机会,学习、培训、跳槽等交流
QQ群:26931708

Hadoop源代码研究群
兴趣范围包括:Hadoop源代码解读,改进,优化,分布式系统场景定制,与Hadoop有关的各种开源项目,总之就是玩转Hadoop
QQ群:288410967 

鲜花

握手

雷人

路过

鸡蛋

最新评论

热门频道

  • 大数据
  • 商业智能
  • 量化投资
  • 科学探索
  • 创业

即将开课

热门文章