Word2Vec是一个Estimator,它接收文档的word序列作为参数,训练一个Word2VecModel,该model会将每个word映射到一个唯一的固定大小的vector中。Word2VecModel将document中的所有word的平均,将每个document转换成一个vector;该vector可以被用在预测中使用的feature,也可用于相似度计算等等。详情可参见(mllib Word2Vec Guid)[http://spark.apache.org/docs/latest/mllib-feature-extraction.html#word2vec]
下面的代码段中,我们使用一个文档集,每个文档都表示成一个word序列串。对于每个文档,我们将它转换成一个feature vector。该feature vector可以作为参数传给响应的机器学习算法。
import org.apache.spark.ml.feature.Word2Vec
// Input data: Eache row is a bag of words from a sentence of document.
val documentDF = spark.createDataFrame(Seq(
"Hi I head about Spark".split(" "),
"I wish Java could use case classes".split(" "),
"Logistic regression models are neat".split(" ")
).map(Tuple1.apply)).toDF("text")
// Learn a mapping from words to Vectors.
val word2vec = new Word2Vec()
.setInputCol("text")
.setOutputCol("result")
.setVectorSize(3)
.setMinCount(0)
val model = word2Vec.fit(documentDF)
val result = model.transform(documentDF)
result.select("result").take(3).foreach(println)
CountVectorizer和CountVectorizerModel目标是帮助将一系列文本文档转换成token count的向量。如果不存在一个a-priori字典,可以使用CountVectorizer作为一个Estimator来抽取词汇表,生成一个CountVectorizerModel模型。这个模型会根据词汇表生成文档的稀疏表示,接着可以传到其他算法中(比如LDA)。
在进行fit时,CountVectorizer会选择vocabSize的top词汇,根据语料上的词频排序。一个可选的参数是minDF,通过指定在词汇表中的单个词汇出现的最小数量的文档数,它也会影响fit过程。另一个可选的二元toggle参数,可以控制输出向量。如果设为true,所有非零的count会被设置为1。这对于一些二进制的count的(非整形)离散概率模型来说很有用。
示例:下面的DataFrame,具有两列:id和texts:
id | texts |
---|---|
0 | Array("a", "b", "c") |
1 | Array("a", "b", "b", "c", "a") |
texts中的每行是一个类型为Array[String]的文档,通过调用CountVectorizer的fit,会生成一个CountVectorizerModel模型,具有词汇表(a, b, c)。接着是转换后的输出列,"vector":
id | texts | vector |
---|---|---|
0 | Array("a", "b", "c") | (3, [0, 1, 2], [1.0, 1.0, 1.0]) |
1 | Array("a", "b", "b", "c", "a") | (3, [0, 1, 2], [2.0, 2.0, 1.0]) |
每个vector表示文档再词汇表中的token count。
import org.apache.spark.ml.feature.{CountVectorizer, CountVectorizerModel}
val df = spark.createDataFrame(Seq(
(0, Array("a", "b", "c")),
(1, Array("a", "b", "b", "c", "a"))
)).toDF("id", "words")
// fit a CountVectorizerModel from the corpus
val cvModel: CountVectorizerModel = new CountVectorizer()
.setInputCol("words")
.setOutputCol("features")
.setVocabSize(3)
.setMinDF(2)
.fit(df)
// alternatively, define CountVectorizerModel with a-priori vocabulary
val cvm = new CountVectorizerModel(Array("a", "b", "c"))
.setInputCol("words")
.setOutputCol("features")
cvModel.transform(df).select("features").show()
Tokenization是个文本处理过程,它将句子断成独立的terms(通常为words)。Tokenizer提供了该功能。下面的示例展示了如何将句子分割成一串词。
RegexTokenizer允许更多高级特性的tokenization: 正则表达式匹配缺省的,参数"pattern"(regex, default:"s+")被用于使用空格分割输入文本。可选的,用户可以设置参数"gaps"=false,这表示正则项"pattern"为tokens,而非分割gaps,可以找出所有匹配出现的词作为tokenization的结果。
import org.apache.spark.ml.feature.{RegexTokenizer, Tokenizer}
val sentenceDataFrame = spark.createDataFrame(Seq(
(0, "Hi I heard about Spark"),
(1, "I wish Java could use case classes"),
(2, "Logistic, Regression, Models, are, neat")
)).toDF("label", "sentence")
val tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words")
val regexTokenizer = new RegexTokenizer()
.setInputCol("sentence")
.setOutputCol("words")
.setPattern("\\W") // alternatively.setPattern("\\w+").setGaps(false)_
val tokenizer = tokenizer.transform(sentenceDataFrame)
tokenizer.select("words", "label").take(3).foreach(println)
val regexTokenized = regexTokenizer.transform(sentenceDataFrame)
regexTokenized.select("words", "label").take(3).foreach(println)
停留词(stop words)指的是那些需要从输入排除在外的词,通常是因为这些词出现很频繁,并且没啥含义。
StopWordsRemover,允许一串字符串作为输入(比如:Tokenizer的输出),并从输入的序列中抛弃掉停留词。停留词列表由stopWords参数来指定。我们缺省提供了一个stopwords的列表,可以通过在一个刚初始化的StopWordsRemover实例上调用getStopWords来获取。caseSensitive参数表示是否大小写敏感(缺省为false)。
示例:假设我们具有以下的DataFrame,具有两列(id和raw):
id | raw |
---|---|
0 | [I,saw,the,red,baloon] |
1 | [Mary,had,a,little,lamb] |
使用StopWordsRemover,输入为raw,输出为filtered,我们可以获得以下的结果:
id | raw | filtered |
---|---|---|
0 | [I,saw,the,red,baloon] | [saw,red,baloon] |
1 | [Mary,had,a,little,lamb] | [Mary,little,lamb] |
在filtered列,停留词"I", "the", "had"和"a"都已经被过滤掉了。
import org.apache.spark.ml.feature.StopWordsRemover
val remover = new StopWordsRemover()
.setInputCol("raw")
.setOutputCol("filtered")
val dataSet = sqlContext.createDataFrame(Seq(
(0, Seq("I", "saw", "the", "red", "baloon")),
(1, Seq("Mary", "had", "a", "little", "lamb"))
)).toDF("id", "raw")
remover.transform(dataSet).show()
n-gram是一个tokens序列,n为整数。NGram可以用于将输入特征转换成n-gram。
Binarization会将数值型特征(numerical features)的阈值压到二元feature中(0/1)。
Binarizer使用公共参数:inputCol和outputCol,threshold来进行二值化。Feature的值如果大于threshold,就二值化为1.0;如果值小于threshold,就二值化为0.0。inputCol支持Vector和Double类型。
import org.apache.spark.ml.feature.Binarizer
val data = Array((0, 0.1), (1, 0.8), (2, 0.2))
val dataFrame = spark.createDataFrame(data).toDF("label", "feature")
val binarizer: Binarizer = new Binarizer()
.setInputCol("feature")
.setOutputCol("binarized_feature")
.setThreshold(0.5)
val binarizedDataFrame = binarizer.transform(dataFrame)
val binarizedFeature = binarizedDataFrame.select("binarized_feature")
binarizedFeatures.collect().foreach(println)
PCA是一个统计过程,它使用正交变换,将一个可能相关变量的观察集,转换成一个线性不相关变量的集合,这些变量集称为主成分(principal components)。PCA类会训练一个模型,会使用PCA技术将向量投影到一个低维空间上。下例展示了,如何将一个5维的features投影到3维的主成分上。
import org.apache.spark.ml.feature.PCA
import org.apache.spark.ml.linalg.Vectors
val data = Array(
Vectors.sparse(5, Seq((1, 1.0), (3, 7.0))),
Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
)
val df = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")
val pca = new PCA()
.setInputCol("features")
.setOutputCol("pcaFeatures")
.setK(3)
.fit(df)
val pcaDF = pca.transform(df)
val result = pcaDF.select("pcaFeatures")
result.show()
多项式展开可以将你的特征展开到一个多项式空间上,它可以将原始维展开到n-degree组合上。PolynomialExpansion类提供了这个功能。下面的示例展示了将你的特征展开到3-degree的多项式空间上。
import org.apache.spark.ml.feature.PolynomialExpansion
import org.apache.spark.ml.linalg.Vectors
val data = Array(
Vectors.dense(-2.0, 2.3),
Vectors.dense(0.0, 0.0),
Vectors.dense(0.6, -1.1)
)
val df = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")
val polynomialException = new polynomialException()
.setInputCol("features")
.setOutputCol("polyFeatures")
.setDegree(3)
val polyDF = polynomialException.transform(df)
polyDF.select("polyFeatures").take(3).foreach(println)
Discrete Cosine Transform可以将在时间域(time domain)上长度为N的实数值序列,转换成另一个在频率域(frequency domain)内长度为N的实数值序列。DCT类提供了这个功能,将实现了DCT-II并将结果通过1^sqrt_2
进行归一化,以便将变换后的矩阵单元化。不需要再转换序列上进行shift操作(例如:转换序列的第0th位的元素,对应于DCT的第0th coefficient,而非第N/2 th)
DCT示例:
import org.apache.spark.ml.feature.DCT
import org.apache.spark.ml.linalg.Vectors
val data = Seq(
Vectors.dense(0.0, 1.0, -2.0, 3.0),
Vectors.dense(-1.0, 2.0, 4.0, -7.0),
Vectors.dense(14.0, -2.0, -5.0, 1.0))
val df = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")
val dct = new DCT()
.setInputCol("features")
.setOutputCol("featureDCT")
.setInverse(false)
val dctDF = dct.transform(df)
dctDF.select("featuresDCT").show(3)
StringIndexer将一列string列的label编码成一列label索引。索引indice的值为[0, numLabels],按label频次排序,最频繁的label会得到index 0,如果输入列是数值型的(numeric),我们可以将它转换成string,并索引string值。当对pipeline组件(比如:Estimator或Transformer)进行downstream时,可以利用string-indexed的label,你必须将该组件的输入列设置成string-indexed列名。在许多情况下,你可以使用setInputCol来完成。
示例:假设,我们具有以下的DataFrame,具有两列:id和category。
id | category |
---|---|
0 | a |
1 | b |
2 | c |
3 | a |
4 | a |
5 | c |
category有三种label:"a", "b"和"c"。使用StringIndexer对catetory作为输入列,categoryIndex作为输出列,我们可以得到下面的:
id | category | categoryIndex |
---|---|---|
0 | a | 0.0 |
1 | b | 2.0 |
2 | c | 1.0 |
3 | a | 0.0 |
4 | a | 0.0 |
5 | c | 1.0 |
"a"最频繁,所以index=0,"c"的index为1,而"b"的index为2.
另外,要牢记,当你在一个数据集上使用StringIndexer进行fit,并使用它进行转换时,有两种策略来使用StringIndex处理看不到的label:
示例:回到之前的示例,数据集为:
id | catetory |
---|---|
0 | a |
1 | b |
2 | c |
3 | d |
如果你没有设置StringIndexer是如何处理未看到label的,缺省会抛出异常。然而,如果你调用:setHandleInvalid("skip"),会生成下面的数据集:
id | category | catetoryIndex |
---|---|---|
0 | a | 0.0 |
1 | b | 2.0 |
2 | c | 1.0 |
注意:"d"行未包含其中。
import org.apache.spark.ml.feature.StringIndexer
val df = spark.createDataFrame(
Seq((0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c"))
).toDF("id", "catetory")
val indexer = new StirngIndexer()
.setInputCol("catetory")
.setOutputCol("categoryIndex")
val indexed = indexer.fit(df).transform(df)
indexed.show()
与StringIndexer相同,IndexToString会将一列label indices映射回一个包含原始label的列,作为String。常见的用例是:使用StringIndexer产生label索引,使用这些索引训练模型,并使用IndexToString,从预测的结果咧中索引回原始的label。当然,你也可以自由地以你自己定义的label方式工作。
示例:使用StringIndexer构建的示例,假设我们有以下的DataFrame,具有列:id和categoryIndex:
id | categoryIndex |
---|---|
0 | 0.0 |
1 | 2.0 |
2 | 1.0 |
3 | 0.0 |
4 | 0.0 |
5 | 1.0 |
接着,我们使用indexToString,将categoryIndex最为输入列,originalCategory作为输出列,我们可以检索回我们的原始label:
id | catetoryIndex | originalCategory |
---|---|---|
0 | 0.0 | a |
1 | 2.0 | b |
2 | 1.0 | c |
3 | 0.0 | a |
4 | 0.0 | a |
5 | 1.0 | c |
示例代码:
import org.apache.spark.ml.feature.{IndexToString, StringIndexer}
val df = spark.createDataFrame(Seq(
(0, "a"),
(1, "b"),
(2, "c"),
(3, "a"),
(4, "a"),
(5, "c")
)).toDF("id", "catetory")
val indexer = new StringIndexer()
.setInputCol("catetory")
.setOutputCol("categoryIndex")
.fit(df)
val indexed = indexer.transform(df)
val converter = new IndexToString()
.setInputCol("categoryIndex")
.setOutputCol("originalCategory")
val converted = converter.transform(indexed)
converted.select("id", "originalCategory").show()
One-hot encoding会将一列label索引映射到一列二元表示的向量,至多只有一个值。这种编码允许某些期望是连续型feature算法。比如:Logistic Regression,来使用类别型的feature。
import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer}
val df = spark.createDataFrame(Seq(
(0, "a"),
(1, "b"),
(2, "c"),
(3, "a"),
(4, "a"),
(5, "c")
)).toDF("id", "category")
val indexer = new StringIndexer()
.setInputCol("category")
.setOutputCol("categoryIndex")
.fit(df)
val indexed = indexer.transform(df)
val encoder = new OneHotEncoder()
.setInputCol("categoryIndex")
.setOutputCol("categoryVec")
val encoded = encoder.transform(indexed)
encoded.select("id", "catetoryVec").show()
VectorIndex可以用于索引Vectors类型数据集中的类别型feature。它可以自动决定哪个feature是类别型的(categorical),并将它的原始值转换成类别id索引。必须按照以下操作进行:
类别型feature的索引,允许类似的算法:决策树,Ensembles树来合理地处理类别型feature,以提升性能。
在下面的示例中:
import org.apache.spark.ml.feature.VectorIndexer
val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
val indexer = new VectorIndexer()
.setInputCol("features")
.setOutputCol("indexed")
.setMaxCategories(10)
val indexerModel = indexer.fit(data)
val categoricalFeatures: Set[Int] = indexerModel.categoryMaps.keys.toSet
println(s"Chose ${categoricalFeatures.size} categorical features: " +
categoricalFeatures.mkString(","))
// Create new column "indexed" with categorical values transformed to indices
val indexedData = indexerModel.transform(data)
indexedData.show()
Normalizer是一个Transformer,它可以将多行Vector的数据集进行转换,将每个Vector归一化成unit normal形式。它使用参数p,该参数指定了p-norm用于归一化。(缺省:p=2),该归一化能将你的输入数据标准化,并改善学习算法的行为。
下面的示例,展示了如何以libsvm的格式加载数据集,接着对每行进行L2范式的归一化,和其他L无穷范式的归一化。
import org.apache.spark.ml.feature.Normalizer
val dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
// Normalizer each Vector using $L^1$ norm.
val normalizer = new Normalizer()
.setInputCol("features")
.setOutputCol("normFeatures")
.setP(1.0)
val l1NormData = normalizer.transform(dataFrame)
l1NormData.show()
// Normalize each Vector using $L^infty$ norm.
val 1InfNormData = normalizer.transform(dataFrame, normalizer.p -> Double.PositiveInfinity)
1InfNormData.show()
StandardScaler可以将多行的Vector数据进行转换,将每个feature归一化到具有单位标准差,和零均值的范围上。它的参数有:
StandardScaler是一个Estimator,它可以在数据集上进行fit操作,产生一个StandardScalerModel;这相当于计算总的统计。接着,该模型可以将数据集中的某个Vector列,转换成具有标准差为1和/或零均值的feature中。
注意:如果feature的标准差为0,它在Vector的对应feature上返回缺省的0.0值。
下面的示例展示了,如何在libsvm格式加载一个数据集,并接着将每个feature归一化到标准差为1.
import org.apache.spark.ml.feature.StandardScaler
val dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
val scaler = new StandardScaler()
.setInputCol("feature")
.setOutputCol("scaledFeatures")
.setWithStd(true)
.setWithMean(false)
// Compute summary statistics by fitting the StandardScaler.
val scalerModel = scaler.fit(dataFrame)
// Normalize each feature to have unit standard deviation.
val scaledData = scalerModel.transform(dataFrame)
scaledData.show()
MinMaxScaler可以对多行的Vector数据集进行转换,将每个feature归一化到一个指定的范围内(比如:常用的[0,1])。它的参数:
MinMaxScaler会计算在数据集上的汇总统计,并产生一个MinMaxScalerModel。该模型可以将每个feature独自地转换到该指定范围中。
归一化(rescale)计算如下:
Rescaled(e_i) = (e_i - E_min) / (E_max - E_min) * (max - min) + min
如果:E_max == E_min,那么:Rescaled(e_i) = 0.5 * (max+ min)
注意:由于零值可能被转换成非零值,对于sparse的输入,转换器的输出也必须是DenseVector。
下例展示了如何以libsvm的格式加载数据集,将每个feature归一化到[0,1]中。
import org.apache.saprk.ml.feature.MinMaxScaler
val dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
val scaler = new MinMaxScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures")
// Compute summary statistics and generate MinMaxScalerModel
val scalerModel = scaler.fit(dataFrame)
// rescale each feature to range [min, max].
val scaledData = scalerModel.transform(dataFrame)
scaledData.show()
MaxAbsScaler将每个feature转换成[-1, 1]上,通过每个feature的最大绝对值。它不会shift/center化数据,因为这样会毁了数据的稀疏性。
MaxAbsScaler会计算数据集上的汇总统计信息,并产生一个MaxAbsScalerModel。接着使用该model将每个feature单独转换到[-1, 1]的范围。
下列展示了如何加载libsvm的数据集,接着将每个feature归一化到[-1, 1]上。
import org.apache.spark.ml.feature.MaxAbsScaler
val dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
val scaler = new MaxAbsScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures")
// Compute summary statistics and generate MaxAbsScalerModel
val scalerModel = scaler.fit(dataFrame)
// rescale each feature to range [-1, 1]
val scaledData = scalerModel.transform(dataFrame)
scaledData.show()
Bucketizer会将一列连续的feature转换成一列feature buckets,对应的buckets由用于指定。它的参数如下:
注意,如果你不知道目标列的上界与下界,你应该添加Double.NegativeInfinity和Double.PositiveInfinity作为你的边界,以防止Bucketizer的潜在异常。
注意:splits必须严格递增:比如:s0 < s1 < s2 < ... < sn。
下面的示例,展示了如何将Double列分桶成另一个index-wised列。
import org.apache.spark.ml.feature.Bucketizer
val splits = Array(Double.NegativeInfinity, -0.5, 0.0, 0.5, Double.PositiveInfinity)
val data = Array(-0.5, -0.3, 0.0, 0.2)
val dataFrame = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")
val bucketizer = new Bucketizer()
.setInputCol("features")
.setOutputCol("bucketedFeatures")
.setSplits(splits)
// Transform original data into its bucket index.
val bucketdData = bucketizer.transform(dataFrame)
bucketdData.show()
ElementwiseProduct会将每个输入向量以及一个由它提供的"weight"向量相乘,使用element-wise乘法。换句话说,它将数据集的每一列都乘以一个scalar因子进行归一化。这种方式表示了input向量v,和转换向量w之间的Hadamard product,产生一个新的结果向量。
示例:
import org.apache.spark.ml.feature.ElementwiseProduct
import org.apache.spark.ml.linalg.Vectors
// Create some vector data; also works for sparse vectors
val dataFrame = spark.createDataFrame(Seq(
("a", Vectors.dense(1.0, 2.0, 3.0)),
("b", Vectors.dense(4.0, 5.0, 6.0))
)).toDF("id", "vector")
val transformingVector = Vectors.dense(0.0, 1.0, 2.0)
val transformer = new ElementwiseProduct()
.setScalingVec(transformingVector)
.setInputCol("vector")
.setOutputCol("transformingVector")
// Batch transform the vectors to create new column
transformer.transform(dataFrame).show()
SQLTransformer实现了由sql语句定义的转换。当前,我们只支持这样的SQL语法:"SELECT...FROM _THIS_..."其中的"_THIS_"表示了输入数据集的底层表。select语句指定了在输出中展示相应的字段,常量,表达式,可以是任何Spark SQL支持的select语句。用户可以使用Spark SQL的内建函数和UDF来操作选中的列。例如,SQLTransformer支持如下声明:
示例:DataFrame有三列,id,v1,v2
id | v1 | v2 |
---|---|---|
0 | 1.0 | 3.0 |
2 | 2.0 | 5.0 |
SQLTransformer相应的输出可以使用这样的语句:"SELECT , (v1 + v2) AS v3, (v1 v2) AS v4 FROM __THIS__"
id | v1 | v2 | v3 | v4 |
---|---|---|---|---|
0 | 1.0 | 3.0 | 4.0 | 3.0 |
2 | 2.0 | 5.0 | 7.0 | 10.0 |
import org.apache.spark.ml.feature.SQLTransformer
val df = spark.createDataFrame(
Seq((0, 1.0, 3.0), (2, 2.0, 5.0))).toDF("id", "v1", "v2")
val sqlTrans = new SQLTransformer().setStatement(
"SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")
sqlTrans.transform(df).show()
VectorAssembler是一个transformer,它可以将给定的多列转换成单个vector列。这对于将多个raw features、以及由不同的feature transformers生成的features,组合成单个feature vector,以便于训练ML model(比如logistic regression和决策树)。VectorAssembler接受下面的输入列类型:所有numeric类型,boolean类型,vector类型。在每行中,输入列的值都会被串联成一个指定顺序的vector。
示例:假设我们具有一个DataFrame,它具有这样的列:id, hour, mobile, userFeatures, and clicked:
id | hour | mobile | userFeatures | clicked |
---|---|---|---|---|
0 | 18 | 1.0 | [0.0, 10.0, 0.5] | 1.0 |
userFeatures是一个vector列,它包含了三个用户特征。我们希望将hour,mobile,userFeatures组合成一个叫做"features"的feature vector,并使用它来预测clicked or not。如果我们将VectorAssembler的输入列设置成hour,mobile,userFeatures,将输出列设置成features,在该转换后,会得到以下的DataFrame:
id | hour | mobile | userFeatures | clicked | features |
---|---|---|---|---|---|
0 | 18 | 1.0 | [0.0, 10.0, 0.5] | 1.0 | [18.0, 1.0, 0.0, 10.0, 0.5] |
示例代码:
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.linalg.Vectors
val dataset = spark.createDataFrame(
Seq((0, 18, 1.0, Vectors.dense(0.0, 10.0, 0.5), 1.0))
).toDF("id", "hour", "mobile", "userFeatures", "clicked")
val assembler = new VectorAssembler()
.setInputCol(Array("hour", "mobile", "userFeatures"))
.setOutputCol("features")
val output = assembler.transform(dataset)
println(output.select("features", "clicked").first())
QuantileDiscretizer可以将一列连续型特征作为输入,产生一列二进制型类别特征(binned categorical features)。二进制的数目通过参数numBuckets进行设定。bin选择的范围,可以使用一个近似算法(详见文档(approxQuantile)[http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameStatFunctions])。这种逼近的precision可以通过relativeError参数进行控制。如果该参数设置为0,会得到精准的分位数(注意:计算精准分位数是一项开销很昂贵的操作)。bin的上下界可以是(-Infinity, Infinity),这样可以覆盖整个实数集。
示例:假设我们就有一个DataFrame,它的列为:id, hour:
id | hour |
---|---|
0 | 18.0 |
1 | 19.0 |
2 | 8.0 |
3 | 5.0 |
4 | 2.2 |
hour是一个连续型feature,Double类型。我们希望将连续型feature转换成一个类别型feature。通过设置numBuckets=3,我们可以得到以下的DataFrame:
id | hour | result |
---|---|---|
0 | 18.0 | 2.0 |
1 | 19.0 | 2.0 |
2 | 8.0 | 1.0 |
3 | 5.0 | 1.0 |
4 | 2.2 | 0.0 |
import org.apache.spark.ml.feature.QuantileDiscretizer
val data = Array((0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, 2.2))
val df = spark.createDataFrame(data).toDF("id", "hour")
val discretizer = new QuantileDiscretizer()
.setInputCol("hour")
.setOutputCol("result")
.setNumBuckets(3)
val result = disretizer.fit(df).transform(df)
result.show()
VectorSlicer是个转换器:它的输入是一个feature vector,输出为一个新的feature vector(是原始feature的子集)。它对于从一个vector列中抽取feature来说很管用。
VectorSlicer接受指定indice的向量列,接着输出一个新的向量列,它们的值通过indice来选定。有两种类型的indice:
可以通过integer和string进行指定。也就是说,你可以同时使用integer索引和string名。必须至少选择一个feature。不允许重复的feature,因此自选择indices和names时不能重复。注意,如果features的名字被选中了,如果输入的属性是空的会抛出异常。
输出的vector会对feature进行排序,选中的indices按首位,接下去才是选中的names。
RFormula通过指定一个R model formula选择列。当前,我们支持R操作符的一个限定集,包括:'~'‘~’, ‘.’, ‘:’, ‘+’, 和 ‘-‘。基本操作符是:
假设a和b都是double的列,我们可以使用下面的简单示例来展示RFormula的效果:
RFormula会生成一个vector的feature列,和一个double或string类型的label列。和在R中使用线性回归一样的formula一样,string类型的input列将会进行one-hot encoded,而numeric列则会被转成double。如果label列是string类型,它将会首先被StringIndexer转换成double。如果label列在DataFrame中不存在,输出的label列将会从formula中指定的响应变量上被创建。
示例:假设,DataFrame有响应的列:id,country,hour和clicked
id | country | hour | clicked |
---|---|---|---|
7 | "US" | 18 | 1.0 |
8 | "CA" | 12 | 0.0 |
9 | "NZ" | 15 | 0.0 |
如果我们使用RFormula,相应的formula string为:clicked ~ country + hour,这意味着,我们希望基于country和hour区预测clicked,在转换后,我们应该可以得到下面的DataFrame:
id | country | hour | clicked | features | label |
---|---|---|---|---|---|
7 | "US" | 18 | 1.0 | [0.0, 0.0, 18.0] | 1.0 |
8 | "CA" | 12 | 0.0 | [0.0, 1.0, 12.0] | 0.0 |
9 | "NZ" | 15 | 0.0 | [1.0, 0.0, 15.0] | 0.0 |
相应的代码如下:
import org.apache.spark.ml.feature.RFormula
val dataset = spark.createDataFrame(Seq(
(7, "US", 18, 1.0),
(8, "CA", 12, 0.0),
(9, "NZ", 15, 0.0)
)).toDF("id", "country", "hour", "clicked")
val formula = new RFormula()
.setFormula("clicked ~ country + hour")
.setFeaturesCol("features")
.setLabelCol("label")
val output = formula.fit(dataset).transform(dataset)
output.select("feature", "label").show()
ChiSqSelector表示使用卡方分布的选择。它也可以在类别型feature的数据集上进行操作。ChiSqSelector会基于卡方检验来排序feature,接着会选择(筛选)出最独立的top features。
示例:我们具有一个DataFrame,相应的列为:id, features和clicked,它用于我们要预测的target:
id | features | clicked |
---|---|---|
7 | [0.0, 0.0, 18.0, 1.0] | 1.0 |
8 | [0.0, 1.0, 12.0, 0.0] | 0.0 |
9 | [1.0, 0.0, 15.0, 0.1] | 0.0 |
我们使用ChiSqSelector,设置:numTopFeatures = 1,接着根据我们的对应clicked的label作为最后一列,它会在features上选择最有用的一列feature:
id | features | clicked | selectedFeatures |
---|---|---|---|
7 | [0.0, 0.0, 18.0, 1.0] | 1.0 | [1.0] |
8 | [0.0, 1.0, 12.0, 0.0] | 0.0 | [0.0] |
9 | [1.0, 0.0, 15.0, 0.1] | 0.0 | [0.1] |
示例代码:
import org.apache.spark.ml.feature.ChiSqSelector
import org.apache.spark.ml.linalg.Vectors
val data = Seq(
(7, Vectors.dense(0.0, 0.0, 18.0, 1.0), 1.0),
(8, Vectors.dense(0.0, 1.0, 12.0, 0.0), 0.0),
(9, Vectors.dense(1.0, 0.0, 15.0, 0.1), 0.0)
)
val df = spark.createDataFrame(data).toDF("id", "features", "clicked")
val selector = new ChiSqSelector()
.setNumTopFeature(1)
.setFeaturesCol("features")
.setLabelCol("clicked")
.setOutputCol("selectedFeatures")
val result = selector.fit(df).transform(df)
result.show()