Spark ML的Feature提取、转换、选择

liang @ 2018年05月13日

1 特征抽取

1.1 TF-IDF

1.2 Word2Vec

Word2Vec是一个Estimator,它接收文档的word序列作为参数,训练一个Word2VecModel,该model会将每个word映射到一个唯一的固定大小的vector中。Word2VecModel将document中的所有word的平均,将每个document转换成一个vector;该vector可以被用在预测中使用的feature,也可用于相似度计算等等。详情可参见(mllib Word2Vec Guid)[http://spark.apache.org/docs/latest/mllib-feature-extraction.html#word2vec]

下面的代码段中,我们使用一个文档集,每个文档都表示成一个word序列串。对于每个文档,我们将它转换成一个feature vector。该feature vector可以作为参数传给响应的机器学习算法。

import org.apache.spark.ml.feature.Word2Vec

// Input data: Eache row is a bag of words from a sentence of document.
val documentDF = spark.createDataFrame(Seq(
    "Hi I head about Spark".split(" "),
    "I wish Java could use case classes".split(" "),
    "Logistic regression models are neat".split(" ")
).map(Tuple1.apply)).toDF("text")

// Learn a mapping from words to Vectors.
val word2vec = new Word2Vec()
    .setInputCol("text")
    .setOutputCol("result")
    .setVectorSize(3)
    .setMinCount(0)
val model = word2Vec.fit(documentDF)
val result = model.transform(documentDF)
result.select("result").take(3).foreach(println)

1.3 CountVectorizer

CountVectorizer和CountVectorizerModel目标是帮助将一系列文本文档转换成token count的向量。如果不存在一个a-priori字典,可以使用CountVectorizer作为一个Estimator来抽取词汇表,生成一个CountVectorizerModel模型。这个模型会根据词汇表生成文档的稀疏表示,接着可以传到其他算法中(比如LDA)。

在进行fit时,CountVectorizer会选择vocabSize的top词汇,根据语料上的词频排序。一个可选的参数是minDF,通过指定在词汇表中的单个词汇出现的最小数量的文档数,它也会影响fit过程。另一个可选的二元toggle参数,可以控制输出向量。如果设为true,所有非零的count会被设置为1。这对于一些二进制的count的(非整形)离散概率模型来说很有用。

示例:下面的DataFrame,具有两列:id和texts:

idtexts
0Array("a", "b", "c")
1Array("a", "b", "b", "c", "a")

texts中的每行是一个类型为Array[String]的文档,通过调用CountVectorizer的fit,会生成一个CountVectorizerModel模型,具有词汇表(a, b, c)。接着是转换后的输出列,"vector":

idtextsvector
0Array("a", "b", "c")(3, [0, 1, 2], [1.0, 1.0, 1.0])
1Array("a", "b", "b", "c", "a")(3, [0, 1, 2], [2.0, 2.0, 1.0])

每个vector表示文档再词汇表中的token count。

import org.apache.spark.ml.feature.{CountVectorizer, CountVectorizerModel}

val df = spark.createDataFrame(Seq(
  (0, Array("a", "b", "c")),
  (1, Array("a", "b", "b", "c", "a"))
)).toDF("id", "words")

// fit a CountVectorizerModel from the corpus
val cvModel: CountVectorizerModel = new CountVectorizer()
    .setInputCol("words")
    .setOutputCol("features")
    .setVocabSize(3)
    .setMinDF(2)
    .fit(df)
    
// alternatively, define CountVectorizerModel with a-priori vocabulary
val cvm = new CountVectorizerModel(Array("a", "b", "c"))
    .setInputCol("words")
    .setOutputCol("features")
    
cvModel.transform(df).select("features").show()

2 特征转换

2.1 Tokenizer

Tokenization是个文本处理过程,它将句子断成独立的terms(通常为words)。Tokenizer提供了该功能。下面的示例展示了如何将句子分割成一串词。

RegexTokenizer允许更多高级特性的tokenization: 正则表达式匹配缺省的,参数"pattern"(regex, default:"s+")被用于使用空格分割输入文本。可选的,用户可以设置参数"gaps"=false,这表示正则项"pattern"为tokens,而非分割gaps,可以找出所有匹配出现的词作为tokenization的结果。

import org.apache.spark.ml.feature.{RegexTokenizer, Tokenizer}

val sentenceDataFrame = spark.createDataFrame(Seq(
    (0, "Hi I heard about Spark"),
    (1, "I wish Java could use case classes"),
    (2, "Logistic, Regression, Models, are, neat")
)).toDF("label", "sentence")

val tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words")
val regexTokenizer = new RegexTokenizer()
    .setInputCol("sentence")
    .setOutputCol("words")
    .setPattern("\\W")    // alternatively.setPattern("\\w+").setGaps(false)_

val tokenizer = tokenizer.transform(sentenceDataFrame)
tokenizer.select("words", "label").take(3).foreach(println)
val regexTokenized = regexTokenizer.transform(sentenceDataFrame)
regexTokenized.select("words", "label").take(3).foreach(println)

2.2 StopWordsRemover

停留词(stop words)指的是那些需要从输入排除在外的词,通常是因为这些词出现很频繁,并且没啥含义。

StopWordsRemover,允许一串字符串作为输入(比如:Tokenizer的输出),并从输入的序列中抛弃掉停留词。停留词列表由stopWords参数来指定。我们缺省提供了一个stopwords的列表,可以通过在一个刚初始化的StopWordsRemover实例上调用getStopWords来获取。caseSensitive参数表示是否大小写敏感(缺省为false)。

示例:假设我们具有以下的DataFrame,具有两列(id和raw):

idraw
0[I,saw,the,red,baloon]
1[Mary,had,a,little,lamb]

使用StopWordsRemover,输入为raw,输出为filtered,我们可以获得以下的结果:

idrawfiltered
0[I,saw,the,red,baloon][saw,red,baloon]
1[Mary,had,a,little,lamb][Mary,little,lamb]

在filtered列,停留词"I", "the", "had"和"a"都已经被过滤掉了。

import org.apache.spark.ml.feature.StopWordsRemover

val remover = new StopWordsRemover()
    .setInputCol("raw")
    .setOutputCol("filtered")
    
val dataSet = sqlContext.createDataFrame(Seq(
    (0, Seq("I", "saw", "the", "red", "baloon")),
    (1, Seq("Mary", "had", "a", "little", "lamb"))
)).toDF("id", "raw")

remover.transform(dataSet).show()

2.3 n-gram

n-gram是一个tokens序列,n为整数。NGram可以用于将输入特征转换成n-gram。

2.4 Binarizer

Binarization会将数值型特征(numerical features)的阈值压到二元feature中(0/1)。

Binarizer使用公共参数:inputCol和outputCol,threshold来进行二值化。Feature的值如果大于threshold,就二值化为1.0;如果值小于threshold,就二值化为0.0。inputCol支持Vector和Double类型。

import org.apache.spark.ml.feature.Binarizer

val data = Array((0, 0.1), (1, 0.8), (2, 0.2))
val dataFrame = spark.createDataFrame(data).toDF("label", "feature")

val binarizer: Binarizer = new Binarizer()
    .setInputCol("feature")
    .setOutputCol("binarized_feature")
    .setThreshold(0.5)
    
val binarizedDataFrame = binarizer.transform(dataFrame)
val binarizedFeature = binarizedDataFrame.select("binarized_feature")
binarizedFeatures.collect().foreach(println)

2.5 PCA

PCA是一个统计过程,它使用正交变换,将一个可能相关变量的观察集,转换成一个线性不相关变量的集合,这些变量集称为主成分(principal components)。PCA类会训练一个模型,会使用PCA技术将向量投影到一个低维空间上。下例展示了,如何将一个5维的features投影到3维的主成分上。

import org.apache.spark.ml.feature.PCA
import org.apache.spark.ml.linalg.Vectors

val data = Array(
    Vectors.sparse(5, Seq((1, 1.0), (3, 7.0))),
    Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
    Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
)

val df = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")
val pca = new PCA()
    .setInputCol("features")
    .setOutputCol("pcaFeatures")
    .setK(3)
    .fit(df)
val pcaDF = pca.transform(df)
val result = pcaDF.select("pcaFeatures")
result.show()

2.6 多项式展开(PolynomialExpansion)

多项式展开可以将你的特征展开到一个多项式空间上,它可以将原始维展开到n-degree组合上。PolynomialExpansion类提供了这个功能。下面的示例展示了将你的特征展开到3-degree的多项式空间上。

import org.apache.spark.ml.feature.PolynomialExpansion
import org.apache.spark.ml.linalg.Vectors

val data = Array(
    Vectors.dense(-2.0, 2.3),
    Vectors.dense(0.0, 0.0),
    Vectors.dense(0.6, -1.1)
)

val df = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")
val polynomialException = new polynomialException()
    .setInputCol("features")
    .setOutputCol("polyFeatures")
    .setDegree(3)
    
val polyDF = polynomialException.transform(df)
polyDF.select("polyFeatures").take(3).foreach(println)

2.7 离散cosine变化(DCT)

Discrete Cosine Transform可以将在时间域(time domain)上长度为N的实数值序列,转换成另一个在频率域(frequency domain)内长度为N的实数值序列。DCT类提供了这个功能,将实现了DCT-II并将结果通过1^sqrt_2进行归一化,以便将变换后的矩阵单元化。不需要再转换序列上进行shift操作(例如:转换序列的第0th位的元素,对应于DCT的第0th coefficient,而非第N/2 th)

DCT示例:

import org.apache.spark.ml.feature.DCT
import org.apache.spark.ml.linalg.Vectors

val data = Seq(
  Vectors.dense(0.0, 1.0, -2.0, 3.0),
  Vectors.dense(-1.0, 2.0, 4.0, -7.0),
  Vectors.dense(14.0, -2.0, -5.0, 1.0))
  
val df = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")

val dct = new DCT()
    .setInputCol("features")
    .setOutputCol("featureDCT")
    .setInverse(false)
    
val dctDF = dct.transform(df)
dctDF.select("featuresDCT").show(3)

2.8 StringIndexer

StringIndexer将一列string列的label编码成一列label索引。索引indice的值为[0, numLabels],按label频次排序,最频繁的label会得到index 0,如果输入列是数值型的(numeric),我们可以将它转换成string,并索引string值。当对pipeline组件(比如:Estimator或Transformer)进行downstream时,可以利用string-indexed的label,你必须将该组件的输入列设置成string-indexed列名。在许多情况下,你可以使用setInputCol来完成。

示例:假设,我们具有以下的DataFrame,具有两列:id和category。

idcategory
0a
1b
2c
3a
4a
5c

category有三种label:"a", "b"和"c"。使用StringIndexer对catetory作为输入列,categoryIndex作为输出列,我们可以得到下面的:

idcategorycategoryIndex
0a0.0
1b2.0
2c1.0
3a0.0
4a0.0
5c1.0

"a"最频繁,所以index=0,"c"的index为1,而"b"的index为2.

另外,要牢记,当你在一个数据集上使用StringIndexer进行fit,并使用它进行转换时,有两种策略来使用StringIndex处理看不到的label:

  • 抛出一个exception(缺省方式)
  • 跳过包含unseen label的整行

示例:回到之前的示例,数据集为:

idcatetory
0a
1b
2c
3d

如果你没有设置StringIndexer是如何处理未看到label的,缺省会抛出异常。然而,如果你调用:setHandleInvalid("skip"),会生成下面的数据集:

idcategorycatetoryIndex
0a0.0
1b2.0
2c1.0

注意:"d"行未包含其中。

import org.apache.spark.ml.feature.StringIndexer

val df = spark.createDataFrame(
    Seq((0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c"))
    ).toDF("id", "catetory")
    
val indexer = new StirngIndexer()
    .setInputCol("catetory")
    .setOutputCol("categoryIndex")
    
val indexed = indexer.fit(df).transform(df)
indexed.show()

2.9 IndexToString

与StringIndexer相同,IndexToString会将一列label indices映射回一个包含原始label的列,作为String。常见的用例是:使用StringIndexer产生label索引,使用这些索引训练模型,并使用IndexToString,从预测的结果咧中索引回原始的label。当然,你也可以自由地以你自己定义的label方式工作。

示例:使用StringIndexer构建的示例,假设我们有以下的DataFrame,具有列:id和categoryIndex:

idcategoryIndex
00.0
12.0
21.0
30.0
40.0
51.0

接着,我们使用indexToString,将categoryIndex最为输入列,originalCategory作为输出列,我们可以检索回我们的原始label:

idcatetoryIndexoriginalCategory
00.0a
12.0b
21.0c
30.0a
40.0a
51.0c

示例代码:

import org.apache.spark.ml.feature.{IndexToString, StringIndexer}

val df = spark.createDataFrame(Seq(
  (0, "a"),
  (1, "b"),
  (2, "c"),
  (3, "a"),
  (4, "a"),
  (5, "c")
)).toDF("id", "catetory")

val indexer = new StringIndexer()
  .setInputCol("catetory")
  .setOutputCol("categoryIndex")
  .fit(df)
val indexed = indexer.transform(df)

val converter = new IndexToString()
  .setInputCol("categoryIndex")
  .setOutputCol("originalCategory")
  
val converted = converter.transform(indexed)
converted.select("id", "originalCategory").show()

2.10 OneHotEncoder

One-hot encoding会将一列label索引映射到一列二元表示的向量,至多只有一个值。这种编码允许某些期望是连续型feature算法。比如:Logistic Regression,来使用类别型的feature。

import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer}

val df = spark.createDataFrame(Seq(
  (0, "a"),
  (1, "b"),
  (2, "c"),
  (3, "a"),
  (4, "a"),
  (5, "c")
)).toDF("id", "category")

val indexer = new StringIndexer()
  .setInputCol("category")
  .setOutputCol("categoryIndex")
  .fit(df)
val indexed = indexer.transform(df)

val encoder = new OneHotEncoder()
  .setInputCol("categoryIndex")
  .setOutputCol("categoryVec")
val encoded = encoder.transform(indexed)
encoded.select("id", "catetoryVec").show()

2.11 VectorIndexer

VectorIndex可以用于索引Vectors类型数据集中的类别型feature。它可以自动决定哪个feature是类别型的(categorical),并将它的原始值转换成类别id索引。必须按照以下操作进行:

  • 使用一个Vector类型的输入列,和一个参数maxCategories
  • 基于不同的值来决定哪个feature应该是类别型的,features的最大类别数由maxCategories来指定。
  • 为每个类别feature,计算从0开始的类别id。
  • 索引类别型feature,将原始feature转换成索引id。

类别型feature的索引,允许类似的算法:决策树,Ensembles树来合理地处理类别型feature,以提升性能。

在下面的示例中:

import org.apache.spark.ml.feature.VectorIndexer

val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

val indexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexed")
  .setMaxCategories(10)
  
val indexerModel = indexer.fit(data)

val categoricalFeatures: Set[Int] = indexerModel.categoryMaps.keys.toSet
println(s"Chose ${categoricalFeatures.size} categorical features: " + 
  categoricalFeatures.mkString(","))
  
// Create new column "indexed" with categorical values transformed to indices
val indexedData = indexerModel.transform(data)
indexedData.show()

2.12 Normalizer

Normalizer是一个Transformer,它可以将多行Vector的数据集进行转换,将每个Vector归一化成unit normal形式。它使用参数p,该参数指定了p-norm用于归一化。(缺省:p=2),该归一化能将你的输入数据标准化,并改善学习算法的行为。

下面的示例,展示了如何以libsvm的格式加载数据集,接着对每行进行L2范式的归一化,和其他L无穷范式的归一化。

import org.apache.spark.ml.feature.Normalizer

val dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

// Normalizer each Vector using $L^1$ norm.
val normalizer = new Normalizer()
  .setInputCol("features")
  .setOutputCol("normFeatures")
  .setP(1.0)

val l1NormData = normalizer.transform(dataFrame)
l1NormData.show()

// Normalize each Vector using $L^infty$ norm.
val 1InfNormData = normalizer.transform(dataFrame, normalizer.p -> Double.PositiveInfinity)
1InfNormData.show()

2.13 StandardScaler

StandardScaler可以将多行的Vector数据进行转换,将每个feature归一化到具有单位标准差,和零均值的范围上。它的参数有:

  • withStd: 缺省为True。将数据归一化到标准差=1的范围上。
  • withMean:缺省为False。在归一化前,将数据进行中心化。它会构建一个dense型的输出,因此,在稀疏数据集上对它进行操作会抛出一个异常。

StandardScaler是一个Estimator,它可以在数据集上进行fit操作,产生一个StandardScalerModel;这相当于计算总的统计。接着,该模型可以将数据集中的某个Vector列,转换成具有标准差为1和/或零均值的feature中。

注意:如果feature的标准差为0,它在Vector的对应feature上返回缺省的0.0值。

下面的示例展示了,如何在libsvm格式加载一个数据集,并接着将每个feature归一化到标准差为1.

import org.apache.spark.ml.feature.StandardScaler

val dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

val scaler = new StandardScaler()
  .setInputCol("feature")
  .setOutputCol("scaledFeatures")
  .setWithStd(true)
  .setWithMean(false)
  
// Compute summary statistics by fitting the StandardScaler.
val scalerModel = scaler.fit(dataFrame)

// Normalize each feature to have unit standard deviation.
val scaledData = scalerModel.transform(dataFrame)
scaledData.show()

2.14 MinMaxScaler

MinMaxScaler可以对多行的Vector数据集进行转换,将每个feature归一化到一个指定的范围内(比如:常用的[0,1])。它的参数:

  • min:缺省0.0,转换的下界,所有feature都公用。
  • max:缺省1.0,转换的下界,所有feature都公用。

MinMaxScaler会计算在数据集上的汇总统计,并产生一个MinMaxScalerModel。该模型可以将每个feature独自地转换到该指定范围中。

归一化(rescale)计算如下:

Rescaled(e_i) = (e_i - E_min) / (E_max - E_min) * (max - min) + min

如果:E_max == E_min,那么:Rescaled(e_i) = 0.5 * (max+ min)

注意:由于零值可能被转换成非零值,对于sparse的输入,转换器的输出也必须是DenseVector。

下例展示了如何以libsvm的格式加载数据集,将每个feature归一化到[0,1]中。

import org.apache.saprk.ml.feature.MinMaxScaler

val dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

val scaler = new MinMaxScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")
  
// Compute summary statistics and generate MinMaxScalerModel
val scalerModel = scaler.fit(dataFrame)

// rescale each feature to range [min, max].
val scaledData = scalerModel.transform(dataFrame)
scaledData.show()

2.15 MaxAbsScaler

MaxAbsScaler将每个feature转换成[-1, 1]上,通过每个feature的最大绝对值。它不会shift/center化数据,因为这样会毁了数据的稀疏性。

MaxAbsScaler会计算数据集上的汇总统计信息,并产生一个MaxAbsScalerModel。接着使用该model将每个feature单独转换到[-1, 1]的范围。

下列展示了如何加载libsvm的数据集,接着将每个feature归一化到[-1, 1]上。

import org.apache.spark.ml.feature.MaxAbsScaler

val dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
val scaler = new MaxAbsScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")
  
// Compute summary statistics and generate MaxAbsScalerModel
val scalerModel = scaler.fit(dataFrame)

// rescale each feature to range [-1, 1]
val scaledData = scalerModel.transform(dataFrame)
scaledData.show()

2.16 Bucketizer

Bucketizer会将一列连续的feature转换成一列feature buckets,对应的buckets由用于指定。它的参数如下:

  • split: 该参数用于将连续型特征(continus features)映射成buckets。如果split为n+1,则表示有n个buckets。一个bucket的定义通过在[x,y)范围上的值进行split,除了最后一个bucket,它也包含y。splits应该是严格递增的。在(-inf, inf)上的值必须显式提供以覆盖所有Double值。否则,splits外指定的值会被认为是错误的。splits的两个示例:Array(Double.NegativeInfinity, 0.0, 1.0, Double.PositiveInfinity),Array(0.0, 1.0, 2.0)。

注意,如果你不知道目标列的上界与下界,你应该添加Double.NegativeInfinity和Double.PositiveInfinity作为你的边界,以防止Bucketizer的潜在异常。

注意:splits必须严格递增:比如:s0 < s1 < s2 < ... < sn。

下面的示例,展示了如何将Double列分桶成另一个index-wised列。

import org.apache.spark.ml.feature.Bucketizer

val splits = Array(Double.NegativeInfinity, -0.5, 0.0, 0.5, Double.PositiveInfinity)

val data = Array(-0.5, -0.3, 0.0, 0.2)
val dataFrame = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")

val bucketizer = new Bucketizer()
  .setInputCol("features")
  .setOutputCol("bucketedFeatures")
  .setSplits(splits)
  
// Transform original data into its bucket index.
val bucketdData = bucketizer.transform(dataFrame)
bucketdData.show()

2.17 ElementwiseProduct

ElementwiseProduct会将每个输入向量以及一个由它提供的"weight"向量相乘,使用element-wise乘法。换句话说,它将数据集的每一列都乘以一个scalar因子进行归一化。这种方式表示了input向量v,和转换向量w之间的Hadamard product,产生一个新的结果向量。

示例:

import org.apache.spark.ml.feature.ElementwiseProduct
import org.apache.spark.ml.linalg.Vectors

// Create some vector data; also works for sparse vectors
val dataFrame = spark.createDataFrame(Seq(
  ("a", Vectors.dense(1.0, 2.0, 3.0)),
  ("b", Vectors.dense(4.0, 5.0, 6.0))
)).toDF("id", "vector")

val transformingVector = Vectors.dense(0.0, 1.0, 2.0)
val transformer = new ElementwiseProduct()
  .setScalingVec(transformingVector)
  .setInputCol("vector")
  .setOutputCol("transformingVector")
  
// Batch transform the vectors to create new column
transformer.transform(dataFrame).show()

2.18 SQLTransformer

SQLTransformer实现了由sql语句定义的转换。当前,我们只支持这样的SQL语法:"SELECT...FROM _THIS_..."其中的"_THIS_"表示了输入数据集的底层表。select语句指定了在输出中展示相应的字段,常量,表达式,可以是任何Spark SQL支持的select语句。用户可以使用Spark SQL的内建函数和UDF来操作选中的列。例如,SQLTransformer支持如下声明:

  • SELECT a, a+b AS a_b FROM THIS
  • SELECT a, SQRT(b) AS b_sqrt FROM THIS where a > 5
  • SELECT a, b, SUM(c) AS c_sum FROM THIS GROUP BY a, b

示例:DataFrame有三列,id,v1,v2

idv1v2
01.03.0
22.05.0

SQLTransformer相应的输出可以使用这样的语句:"SELECT , (v1 + v2) AS v3, (v1 v2) AS v4 FROM __THIS__"

idv1v2v3v4
01.03.04.03.0
22.05.07.010.0
import org.apache.spark.ml.feature.SQLTransformer

val df = spark.createDataFrame(
  Seq((0, 1.0, 3.0), (2, 2.0, 5.0))).toDF("id", "v1", "v2")
  
val sqlTrans = new SQLTransformer().setStatement(
  "SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")

sqlTrans.transform(df).show()

2.19 VectorAssembler

VectorAssembler是一个transformer,它可以将给定的多列转换成单个vector列。这对于将多个raw features、以及由不同的feature transformers生成的features,组合成单个feature vector,以便于训练ML model(比如logistic regression和决策树)。VectorAssembler接受下面的输入列类型:所有numeric类型,boolean类型,vector类型。在每行中,输入列的值都会被串联成一个指定顺序的vector。

示例:假设我们具有一个DataFrame,它具有这样的列:id, hour, mobile, userFeatures, and clicked:

idhourmobileuserFeaturesclicked
0181.0[0.0, 10.0, 0.5]1.0

userFeatures是一个vector列,它包含了三个用户特征。我们希望将hour,mobile,userFeatures组合成一个叫做"features"的feature vector,并使用它来预测clicked or not。如果我们将VectorAssembler的输入列设置成hour,mobile,userFeatures,将输出列设置成features,在该转换后,会得到以下的DataFrame:

idhourmobileuserFeaturesclickedfeatures
0181.0[0.0, 10.0, 0.5]1.0[18.0, 1.0, 0.0, 10.0, 0.5]

示例代码:

import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.linalg.Vectors

val dataset = spark.createDataFrame(
  Seq((0, 18, 1.0, Vectors.dense(0.0, 10.0, 0.5), 1.0))
).toDF("id", "hour", "mobile", "userFeatures", "clicked")

val assembler = new VectorAssembler()
  .setInputCol(Array("hour", "mobile", "userFeatures"))
  .setOutputCol("features")
  
val output = assembler.transform(dataset)
println(output.select("features", "clicked").first())

2.20 QuantileDiscretizer

QuantileDiscretizer可以将一列连续型特征作为输入,产生一列二进制型类别特征(binned categorical features)。二进制的数目通过参数numBuckets进行设定。bin选择的范围,可以使用一个近似算法(详见文档(approxQuantile)[http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameStatFunctions])。这种逼近的precision可以通过relativeError参数进行控制。如果该参数设置为0,会得到精准的分位数(注意:计算精准分位数是一项开销很昂贵的操作)。bin的上下界可以是(-Infinity, Infinity),这样可以覆盖整个实数集。

示例:假设我们就有一个DataFrame,它的列为:id, hour:

idhour
018.0
119.0
28.0
35.0
42.2

hour是一个连续型feature,Double类型。我们希望将连续型feature转换成一个类别型feature。通过设置numBuckets=3,我们可以得到以下的DataFrame:

idhourresult
018.02.0
119.02.0
28.01.0
35.01.0
42.20.0
import org.apache.spark.ml.feature.QuantileDiscretizer

val data = Array((0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, 2.2))
val df = spark.createDataFrame(data).toDF("id", "hour")

val discretizer = new QuantileDiscretizer()
  .setInputCol("hour")
  .setOutputCol("result")
  .setNumBuckets(3)
  
val result = disretizer.fit(df).transform(df)
result.show()

3 特征选择

3.1 VectorSlicer

VectorSlicer是个转换器:它的输入是一个feature vector,输出为一个新的feature vector(是原始feature的子集)。它对于从一个vector列中抽取feature来说很管用。

VectorSlicer接受指定indice的向量列,接着输出一个新的向量列,它们的值通过indice来选定。有两种类型的indice:

    1. Integer类型的indice,表示vector的索引,通过setIndices()设置。
    1. String类型的indice,表示vector的feature名,通过setNames(**设置。它需要向量列具备一个AttributeGroup,因为它的实现需要与Attribute的名字相匹配。

可以通过integer和string进行指定。也就是说,你可以同时使用integer索引和string名。必须至少选择一个feature。不允许重复的feature,因此自选择indices和names时不能重复。注意,如果features的名字被选中了,如果输入的属性是空的会抛出异常。

输出的vector会对feature进行排序,选中的indices按首位,接下去才是选中的names。

3.2 RFormula

RFormula通过指定一个R model formula选择列。当前,我们支持R操作符的一个限定集,包括:'~'‘~’, ‘.’, ‘:’, ‘+’, 和 ‘-‘。基本操作符是:

  • ~: 分隔target和terms
  • +: concat terms, "+0"就意味着removing intercept
  • -: 移除一个term,"-1"意味着removing intercept
  • :: 交互
  • .: 所有列除了target

假设a和b都是double的列,我们可以使用下面的简单示例来展示RFormula的效果:

  • y ~ a + b => 表示模型:y ~ w0 + w1 a + w2 b,w0是intercept,而w1和w2是coefficients
  • y ~ a + b + a:b - 1 => 表示模型:y ~ w1 a + w2 b + w3 a b,其中,w1,w2,w3是coefficients

RFormula会生成一个vector的feature列,和一个double或string类型的label列。和在R中使用线性回归一样的formula一样,string类型的input列将会进行one-hot encoded,而numeric列则会被转成double。如果label列是string类型,它将会首先被StringIndexer转换成double。如果label列在DataFrame中不存在,输出的label列将会从formula中指定的响应变量上被创建。

示例:假设,DataFrame有响应的列:id,country,hour和clicked

idcountryhourclicked
7"US"181.0
8"CA"120.0
9"NZ"150.0

如果我们使用RFormula,相应的formula string为:clicked ~ country + hour,这意味着,我们希望基于country和hour区预测clicked,在转换后,我们应该可以得到下面的DataFrame:

idcountryhourclickedfeatureslabel
7"US"181.0[0.0, 0.0, 18.0]1.0
8"CA"120.0[0.0, 1.0, 12.0]0.0
9"NZ"150.0[1.0, 0.0, 15.0]0.0

相应的代码如下:

import org.apache.spark.ml.feature.RFormula

val dataset = spark.createDataFrame(Seq(
  (7, "US", 18, 1.0),
  (8, "CA", 12, 0.0),
  (9, "NZ", 15, 0.0)
)).toDF("id", "country", "hour", "clicked")

val formula = new RFormula()
  .setFormula("clicked ~ country + hour")
  .setFeaturesCol("features")
  .setLabelCol("label")
  
val output = formula.fit(dataset).transform(dataset)
output.select("feature", "label").show()

3.3 ChiSqSelector

ChiSqSelector表示使用卡方分布的选择。它也可以在类别型feature的数据集上进行操作。ChiSqSelector会基于卡方检验来排序feature,接着会选择(筛选)出最独立的top features。

示例:我们具有一个DataFrame,相应的列为:id, features和clicked,它用于我们要预测的target:

idfeaturesclicked
7[0.0, 0.0, 18.0, 1.0]1.0
8[0.0, 1.0, 12.0, 0.0]0.0
9[1.0, 0.0, 15.0, 0.1]0.0

我们使用ChiSqSelector,设置:numTopFeatures = 1,接着根据我们的对应clicked的label作为最后一列,它会在features上选择最有用的一列feature:

idfeaturesclickedselectedFeatures
7[0.0, 0.0, 18.0, 1.0]1.0[1.0]
8[0.0, 1.0, 12.0, 0.0]0.0[0.0]
9[1.0, 0.0, 15.0, 0.1]0.0[0.1]

示例代码:

import org.apache.spark.ml.feature.ChiSqSelector
import org.apache.spark.ml.linalg.Vectors

val data = Seq(
  (7, Vectors.dense(0.0, 0.0, 18.0, 1.0), 1.0),
  (8, Vectors.dense(0.0, 1.0, 12.0, 0.0), 0.0),
  (9, Vectors.dense(1.0, 0.0, 15.0, 0.1), 0.0)
)

val df = spark.createDataFrame(data).toDF("id", "features", "clicked")

val selector = new ChiSqSelector()
  .setNumTopFeature(1)
  .setFeaturesCol("features")
  .setLabelCol("clicked")
  .setOutputCol("selectedFeatures")
  
val result = selector.fit(df).transform(df)
result.show()