Spark ML中的特征转换算法(二)
一、VectorIndexer
VectorIndexer 帮助索引向量数据集中的分类特征。它既可以自动决定哪些特征是分类的,也可以将原始值转换为类别索引。具体来说,它执行以下操作:
- 获取 Vector 类型的输入列和参数 maxCategories;
- 根据不同值的数量决定哪些特征应该是分类的,其中最多 maxCategories 的特征被声明为分类的;
- 为每个分类特征计算基于 0 的分类索引;
- 为每个分类特征计算基于 0 的分类索引;
- 索引分类特征允许决策树和树集成等算法适当地处理分类特征,从而提高性能。
示例:
在下面的示例中,我们读入标记点的数据集,然后使用 VectorIndexer 来决定哪些特征应该被视为分类特征。我们将分类特征值转换为它们的索引。然后,可以将转换后的数据传递给处理分类特征的算法,例如 DecisionTreeRegressor。
%spark // 特征转换 —— —— VectorIndexer // VectorIndexer是对数据集特征向量中的类别(离散值)特征进行编号。主要作用:提高决策树或随机森林等ML方法的分类效果。 // 它能够自动判断那些特征是离散值型的特征,并对他们进行编号,具体做法是通过设置一个maxCategories,特征向量中某一个特征不重复取值个数小于maxCategories,则被重新编号为0~K(K<=maxCategories-1)。某一个特征不重复取值个数大于maxCategories,则该特征视为连续值,不会重新编号(不会发生任何改变)。 import org.apache.spark.ml.feature.VectorIndexer val data = spark.read.format("libsvm").load("/data/mllib/sample_libsvm_data.txt") val indexer = new VectorIndexer() .setInputCol("features") .setOutputCol("indexed") // 分类特征可以采用的值数量的阈值。如果发现一个特征具有 > maxCategories 值,则它被声明为连续的。 // 该值必须大于或等于 2。默认为20 .setMaxCategories(10) // 默认error // .setHandleInvalid("error") val indexerModel = indexer.fit(data) val categoricalFeatures: Set[Int] = indexerModel.categoryMaps.keys.toSet println(s"Chose ${categoricalFeatures.size} " + s"categorical features: ${categoricalFeatures.mkString(", ")}") // 使用转换为索引的分类值创建新列“索引” val indexedData = indexerModel.transform(data) indexedData.show()
二、Interaction
实现特征交互变换。 该转换器采用 Double 和 Vector 类型的列,并输出它们的特征交互的扁平向量。 为了处理交互,我们首先对任何名义特征进行一次性编码。 然后,生成特征叉积的向量。
例如,给定输入特征值 Double(2) 和 Vector(3, 4),如果所有输入特征都是数字,则输出将为 Vector(6, 8)。 如果第一个特征是具有四个类别的名义特征,则输出将为 Vector(0, 0, 0, 0, 3, 4, 0, 0)。
%spark // 特征转换 —— —— Interaction // Interaction是一个转换器,它采用向量或双值列,并生成一个向量列,其中包含每个输入列中一个值的所有组合的乘积 // 例如,如果您有 2 个向量类型列,每个列都有 3 个维度作为输入列,那么您将获得一个 9 维向量作为输出列。 import org.apache.spark.ml.feature.Interaction import org.apache.spark.ml.feature.VectorAssembler val df = spark.createDataFrame(Seq( (1, 1, 2, 3, 8, 4, 5), (2, 4, 3, 8, 7, 9, 8), (3, 6, 1, 9, 2, 3, 6), (4, 10, 8, 6, 9, 4, 5), (5, 9, 2, 7, 10, 7, 3), (6, 1, 1, 4, 2, 8, 4) )).toDF("id1", "id2", "id3", "id4", "id5", "id6", "id7") // VectorAssembler类的作用是将多个特征列合并为一列 val assembler1 = new VectorAssembler(). setInputCols(Array("id2", "id3", "id4")). setOutputCol("vec1") val assembled1 = assembler1.transform(df) val assembler2 = new VectorAssembler(). setInputCols(Array("id5", "id6", "id7")). setOutputCol("vec2") val assembled2 = assembler2.transform(assembled1).select("id1", "vec1", "vec2") val interaction = new Interaction() .setInputCols(Array("id1", "vec1", "vec2")) .setOutputCol("interactedCol") val interacted = interaction.transform(assembled2) interacted.show(false) 输出: +---+--------------+--------------+------------------------------------------------------+ |id1|vec1 |vec2 |interactedCol | +---+--------------+--------------+------------------------------------------------------+ |1 |[1.0,2.0,3.0] |[8.0,4.0,5.0] |[8.0,4.0,5.0,16.0,8.0,10.0,24.0,12.0,15.0] | |2 |[4.0,3.0,8.0] |[7.0,9.0,8.0] |[56.0,72.0,64.0,42.0,54.0,48.0,112.0,144.0,128.0] | |3 |[6.0,1.0,9.0] |[2.0,3.0,6.0] |[36.0,54.0,108.0,6.0,9.0,18.0,54.0,81.0,162.0] | |4 |[10.0,8.0,6.0]|[9.0,4.0,5.0] |[360.0,160.0,200.0,288.0,128.0,160.0,216.0,96.0,120.0]| |5 |[9.0,2.0,7.0] |[10.0,7.0,3.0]|[450.0,315.0,135.0,100.0,70.0,30.0,350.0,245.0,105.0] | |6 |[1.0,1.0,4.0] |[2.0,8.0,4.0] |[12.0,48.0,24.0,12.0,48.0,24.0,48.0,192.0,96.0] | +---+--------------+--------------+------------------------------------------------------+
三、Normalizer
Normalizer 是一个 Transformer,它转换 Vector 行的数据集,将每个 Vector 规范化为具有单位范数。 它接受参数 p,它指定用于归一化的 p 范数。 (默认情况下 p=2。)这种标准化可以帮助标准化您的输入数据并改善学习算法的行为。
%spark // 特征转换 —— —— Normalizer // 使用给定的 p 范数将向量归一化以具有单位范数。 import org.apache.spark.ml.feature.Normalizer import org.apache.spark.ml.linalg.Vectors val dataFrame = spark.createDataFrame(Seq( (0, Vectors.dense(1.0, 0.5, -1.0)), (1, Vectors.dense(2.0, 1.0, 1.0)), (2, Vectors.dense(4.0, 10.0, 2.0)) )).toDF("id", "features") // Normalize each Vector using $L^1$ norm. val normalizer = new Normalizer() .setInputCol("features") .setOutputCol("normFeatures") // Lp 空间中的归一化。 必须大于等于 1。(默认值:p = 2) .setP(1.0) val l1NormData = normalizer.transform(dataFrame) println("Normalized using L^1 norm") l1NormData.show() // 使用 $L^\infty$ 范数对每个向量进行归一化。 val lInfNormData = normalizer.transform(dataFrame, normalizer.p -> Double.PositiveInfinity) println("Normalized using L^inf norm") lInfNormData.show() 输出: Normalized using L^1 norm +---+--------------+------------------+ | id| features| normFeatures| +---+--------------+------------------+ | 0|[1.0,0.5,-1.0]| [0.4,0.2,-0.4]| | 1| [2.0,1.0,1.0]| [0.5,0.25,0.25]| | 2|[4.0,10.0,2.0]|[0.25,0.625,0.125]| +---+--------------+------------------+ Normalized using L^inf norm +---+--------------+--------------+ | id| features| normFeatures| +---+--------------+--------------+ | 0|[1.0,0.5,-1.0]|[1.0,0.5,-1.0]| | 1| [2.0,1.0,1.0]| [1.0,0.5,0.5]| | 2|[4.0,10.0,2.0]| [0.4,1.0,0.2]| +---+--------------+--------------+
四、StandardScaler
StandardScaler 转换向量行的数据集,得经过处理的数据符合标准正态分布,即均值为0,标准差为1.
为什么要归一化?
- 归一化后加快了梯度下降求最优解的速度,如果机器学习模型使用梯度下降法求最优解时,归一化往往非常有必要,否则很难收敛甚至不能收敛;
- 归一化有可能提高精度,一些分类器需要计算样本之间的距离(如欧氏距离),例如KNN。如果一个特征值域范围非常大,那么距离计算就主要取决于这个特征,从而与实际情况相悖(比如这时实际情况是值域范围小的特征更重要)。
- 概率模型(树形模型)不需要归一化,因为它们不关心变量的值,而是关心变量的分布和变量之间的条件概率,如决策树、RF。而像Adaboost、SVM、LR、Knn、KMeans之类的最优化问题就需要归一化。
%spark // 特征转换 —— —— StandardScaler // 通过使用训练集中样本的列汇总统计数据去除均值并缩放到单位方差来标准化特征。 // “单位标准差”是使用校正后的样本标准差计算的,该标准差是作为无偏样本方差的平方根计算的。 import org.apache.spark.ml.feature.StandardScaler val dataFrame = spark.read.format("libsvm").load("/data/mllib/sample_libsvm_data.txt") val scaler = new StandardScaler() .setInputCol("features") .setOutputCol("scaledFeatures") // 是否将数据缩放到单位标准差。 默认值:true .setWithStd(true) // 缩放前是否以均值居中数据。 它将构建一个密集的输出,因此在应用于稀疏输入时要小心。 默认值:false .setWithMean(false) // Compute summary statistics by fitting the StandardScaler. val scalerModel = scaler.fit(dataFrame) // Normalize each feature to have unit standard deviation. val scaledData = scalerModel.transform(dataFrame)
//默认输出20列 scaledData.show() 输出: +-----+--------------------+--------------------+ |label| features| scaledFeatures| +-----+--------------------+--------------------+ | 0.0|(692,[127,128,129...|(692,[127,128,129...| | 1.0|(692,[158,159,160...|(692,[158,159,160...| | 1.0|(692,[124,125,126...|(692,[124,125,126...| | 1.0|(692,[152,153,154...|(692,[152,153,154...| | 1.0|(692,[151,152,153...|(692,[151,152,153...| | 0.0|(692,[129,130,131...|(692,[129,130,131...| | 1.0|(692,[158,159,160...|(692,[158,159,160...| | 1.0|(692,[99,100,101,...|(692,[99,100,101,...| | 0.0|(692,[154,155,156...|(692,[154,155,156...| | 0.0|(692,[127,128,129...|(692,[127,128,129...| | 1.0|(692,[154,155,156...|(692,[154,155,156...| | 0.0|(692,[153,154,155...|(692,[153,154,155...| | 0.0|(692,[151,152,153...|(692,[151,152,153...| | 1.0|(692,[129,130,131...|(692,[129,130,131...| | 0.0|(692,[154,155,156...|(692,[154,155,156...| | 1.0|(692,[150,151,152...|(692,[150,151,152...| | 0.0|(692,[124,125,126...|(692,[124,125,126...| | 0.0|(692,[152,153,154...|(692,[152,153,154...| | 1.0|(692,[97,98,99,12...|(692,[97,98,99,12...| | 1.0|(692,[124,125,126...|(692,[124,125,126...| +-----+--------------------+--------------------+
五、RobustScaler
使用对异常值具有鲁棒性的统计数据来缩放特征。 RobustScaler 移除中位数并根据分位数范围缩放数据。分位数范围默认为 IQR(四分位数间距,第 1 个四分位数 = 第 25 个分位数和第 3 个四分位数 = 第 75 个分位数之间的分位数范围),但可以配置。通过计算训练集中样本的相关统计数据,居中和缩放在每个特征上独立发生。然后使用变换方法存储中位数和分位数范围以用于以后的数据。数据集的标准化是许多机器学习估计器的共同要求。通常这是通过去除均值并缩放到单位方差来完成的。但是,异常值通常会对样本均值/方差产生负面影响。在这种情况下,中位数和分位数范围通常会给出更好的结果。请注意,在计算中位数和范围时会忽略 NaN 值。
RobustScaler 是一个 Estimator,可以适合数据集以生成 RobustScalerModel;这相当于计算分位数统计。然后,模型可以将数据集中的向量列转换为具有单位分位数范围和/或零中值特征。
注意:如果一个特征的分位数范围为零,它将在该特征的向量中返回默认的 0.0 值。
%spark // 特征转换 —— —— RobustScaler // RobustScaler 转换向量行的数据集,删除中值并根据特定的分位数范围(默认情况下 IQR:四分位数范围,第一个四分位数和第三个四分位数之间的分位数范围)缩放数据。 // 它的行为与 StandardScaler非常相似,但是使用中位数和分位数范围而不是平均值和标准差,这使其对异常值具有鲁棒性。 import org.apache.spark.ml.feature.RobustScaler val dataFrame = spark.read.format("libsvm").load("/data/mllib/sample_libsvm_data.txt") val scaler = new RobustScaler() .setInputCol("features") .setOutputCol("scaledFeatures") // 是否将数据缩放到分位数范围。默认值:true .setWithScaling(true) // 缩放前是否以中位数居中数据。它将构建一个密集的输出,因此在应用于稀疏输入时要小心。默认值:false .setWithCentering(false) // 下分位数计算分位数范围,由所有特征共享,默认值:0.25 .setLower(0.25) // 用于计算分位数范围的上分位数,由所有特征共享,默认值:0.75 .setUpper(0.75) // 通过拟合 RobustScaler 计算汇总统计数据 val scalerModel = scaler.fit(dataFrame) // 将每个特征转换为具有单位分位数范围 val scaledData = scalerModel.transform(dataFrame) scaledData.show() 输出: +-----+--------------------+--------------------+ |label| features| scaledFeatures| +-----+--------------------+--------------------+ | 0.0|(692,[127,128,129...|(692,[127,128,129...| | 1.0|(692,[158,159,160...|(692,[158,159,160...| | 1.0|(692,[124,125,126...|(692,[124,125,126...| | 1.0|(692,[152,153,154...|(692,[152,153,154...| | 1.0|(692,[151,152,153...|(692,[151,152,153...| | 0.0|(692,[129,130,131...|(692,[129,130,131...| | 1.0|(692,[158,159,160...|(692,[158,159,160...| | 1.0|(692,[99,100,101,...|(692,[99,100,101,...| | 0.0|(692,[154,155,156...|(692,[154,155,156...| | 0.0|(692,[127,128,129...|(692,[127,128,129...| | 1.0|(692,[154,155,156...|(692,[154,155,156...| | 0.0|(692,[153,154,155...|(692,[153,154,155...| | 0.0|(692,[151,152,153...|(692,[151,152,153...| | 1.0|(692,[129,130,131...|(692,[129,130,131...| | 0.0|(692,[154,155,156...|(692,[154,155,156...| | 1.0|(692,[150,151,152...|(692,[150,151,152...| | 0.0|(692,[124,125,126...|(692,[124,125,126...| | 0.0|(692,[152,153,154...|(692,[152,153,154...| | 1.0|(692,[97,98,99,12...|(692,[97,98,99,12...| | 1.0|(692,[124,125,126...|(692,[124,125,126...| +-----+--------------------+--------------------+ only showing top 20 rows
六、MinMaxScaler
MinMaxScaler 转换向量行的数据集,将每个特征重新缩放到特定范围(通常为 [0, 1])。
注意:由于零值可能会转换为非零值,因此即使对于稀疏输入,转换器的输出也将是 DenseVector。
%spark // 特征转换 —— —— MinMaxScaler // 使用列摘要统计将每个特征单独重新缩放到一个公共范围 [min, max],这也称为最小-最大归一化或重新缩放 import org.apache.spark.ml.feature.MinMaxScaler import org.apache.spark.ml.linalg.Vectors // 根据每一列数据进行缩放计算 val dataFrame = spark.createDataFrame(Seq( (0, Vectors.dense(1.0, 0.1, -1.0)), (1, Vectors.dense(2.0, 1.1, 1.0)), (2, Vectors.dense(3.0, 10.1, 3.0)) )).toDF("id", "features") val scaler = new MinMaxScaler() .setInputCol("features") .setOutputCol("scaledFeatures") // 转换后的下限,由所有特征共享默认值:0.0 .setMin(0) //转换后的上限,由所有特征共享默认值:1.0 .setMax(1) // 计算汇总统计并生成 MinMaxScalerModel val scalerModel = scaler.fit(dataFrame) // 将每个特征重新缩放到范围 [min, max]。 val scaledData = scalerModel.transform(dataFrame) scaledData.select("features", "scaledFeatures").show(false) 输出: +--------------+--------------+ |features |scaledFeatures| +--------------+--------------+ |[1.0,0.1,-1.0]|(3,[],[]) | |[2.0,1.1,1.0] |[0.5,0.1,0.5] | |[3.0,10.1,3.0]|[1.0,1.0,1.0] | +--------------+--------------+
七、MaxAbsScaler
MaxAbsScaler 转换向量行的数据集,通过除以每个特征中的最大绝对值将每个特征重新缩放到范围 [-1, 1]。 它不会移动/居中数据,因此不会破坏任何稀疏性。
MaxAbsScaler 计算数据集的汇总统计数据并生成 MaxAbsScalerModel。 然后模型可以将每个特征单独转换为范围 [-1, 1]。
%spark // 特征转换 —— —— MaxAbsScaler // 通过除以每个特征中的最大绝对值,将每个特征单独重新缩放到范围 [-1, 1]。 它不会移动或者居中数据,因此不会破坏任何稀疏性。 import org.apache.spark.ml.feature.MaxAbsScaler import org.apache.spark.ml.linalg.Vectors val dataFrame = spark.createDataFrame(Seq( (0, Vectors.dense(1.0, 0.1, -8.0)), (1, Vectors.dense(2.0, 1.0, -4.0)), (2, Vectors.dense(4.0, 10.0, 8.0)) )).toDF("id", "features") val scaler = new MaxAbsScaler() .setInputCol("features") .setOutputCol("scaledFeatures") // Compute summary statistics and generate MaxAbsScalerModel val scalerModel = scaler.fit(dataFrame) // rescale each feature to range [-1, 1] val scaledData = scalerModel.transform(dataFrame) scaledData.select("features", "scaledFeatures").show(false) 输出: +--------------+--------------------------------+ |features |scaledFeatures | +--------------+--------------------------------+ |[1.0,0.1,-8.0]|[0.25,0.010000000000000002,-1.0]| |[2.0,1.0,-4.0]|[0.5,0.1,-0.5] | |[4.0,10.0,8.0]|[1.0,1.0,1.0] | +--------------+--------------------------------+
八、Bucketizer
Bucketizer 将一列连续特征转换为一列特征桶,其中桶由用户指定。
它需要一个参数:
splits:用于将连续特征映射到桶中的参数。使用 n+1 个拆分,有 n 个桶。由拆分 x,y 定义的存储桶保存范围 [x,y) 中的值,除了最后一个存储桶,它还包括 y。拆分应该严格增加。必须明确提供 -inf、inf 处的值以涵盖所有 Double 值;
否则,指定拆分之外的值将被视为错误。拆分的两个示例是 Array(Double.NegativeInfinity, 0.0, 1.0, Double.PositiveInfinity) 和 Array(0.0, 1.0, 2.0)。
注意,如果不知道目标列的上限和下限,则应添加 Double.NegativeInfinity 和 Double.PositiveInfinity 作为拆分的边界,以防止潜在的超出 Bucketizer 边界异常。
另外需要注意,提供的拆分必须严格按照递增顺序,即 s0 < s1 < s2 < ... < sn。
%spark // 特征转换 —— —— Bucketizer // Bucketizer 将一列连续特征映射到一列特征桶。 import org.apache.spark.ml.feature.Bucketizer val splits = Array(Double.NegativeInfinity, -0.5, 0.0, 0.5, Double.PositiveInfinity) val data = Array(-999.9, -0.5, -0.3, 0.0, 0.2, 999.9) val dataFrame = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features") val bucketizer = new Bucketizer() .setInputCol("features") .setOutputCol("bucketedFeatures") .setSplits(splits) // 包含参数"skip","keep","error",默认:"error" .setHandleInvalid("error") // 将原始数据转换为其存储桶索引。 val bucketedData = bucketizer.transform(dataFrame) println(s"Bucketizer output with ${bucketizer.getSplits.length-1} buckets") bucketedData.show() val splitsArray = Array( Array(Double.NegativeInfinity, -0.5, 0.0, 0.5, Double.PositiveInfinity), Array(Double.NegativeInfinity, -0.3, 0.0, 0.3, Double.PositiveInfinity)) val data2 = Array( (-999.9, -999.9), (-0.5, -0.2), (-0.3, -0.1), (0.0, 0.0), (0.2, 0.4), (999.9, 999.9)) val dataFrame2 = spark.createDataFrame(data2).toDF("features1", "features2") // 输入多列参数 val bucketizer2 = new Bucketizer() .setInputCols(Array("features1", "features2")) .setOutputCols(Array("bucketedFeatures1", "bucketedFeatures2")) .setSplitsArray(splitsArray) // 将原始数据转换为其存储桶索引。 val bucketedData2 = bucketizer2.transform(dataFrame2) bucketedData2.show() 输出: +--------+----------------+ |features|bucketedFeatures| +--------+----------------+ | -999.9| 0.0| | -0.5| 1.0| | -0.3| 1.0| | 0.0| 2.0| | 0.2| 2.0| | 999.9| 3.0| +--------+----------------+ +---------+---------+-----------------+-----------------+ |features1|features2|bucketedFeatures1|bucketedFeatures2| +---------+---------+-----------------+-----------------+ | -999.9| -999.9| 0.0| 0.0| | -0.5| -0.2| 1.0| 1.0| | -0.3| -0.1| 1.0| 1.0| | 0.0| 0.0| 2.0| 2.0| | 0.2| 0.4| 2.0| 3.0| | 999.9| 999.9| 3.0| 3.0| +---------+---------+-----------------+-----------------+
九、ElementwiseProduct
ElementwiseProduct 使用元素乘法将每个输入向量乘以提供的“权重”向量。 换句话说,它通过标量乘数缩放数据集的每一列。 这表示输入向量 v 和转换向量 w 之间的 Hadamard 积,以产生结果向量。
%spark // 特征转换 —— —— ElementwiseProduct // 使用提供的“权重”向量输出每个输入向量的 Hadamard 乘积(即元素乘积)。 换句话说,它通过标量乘数缩放数据集的每一列。 import org.apache.spark.ml.feature.ElementwiseProduct import org.apache.spark.ml.linalg.Vectors // 创建一些矢量数据; 也适用于稀疏向量 val dataFrame = spark.createDataFrame(Seq( ("a", Vectors.dense(1.0, 2.0, 3.0)), ("b", Vectors.dense(4.0, 5.0, 6.0)))).toDF("id", "vector") val transformingVector = Vectors.dense(0.0, 1.0, 2.0) val transformer = new ElementwiseProduct() // 与输入向量相乘的向量 .setScalingVec(transformingVector) .setInputCol("vector") .setOutputCol("transformedVector") // 批量转换向量以创建新列: transformer.transform(dataFrame).show() 输出: +---+-------------+-----------------+ | id| vector|transformedVector| +---+-------------+-----------------+ | a|[1.0,2.0,3.0]| [0.0,2.0,6.0]| | b|[4.0,5.0,6.0]| [0.0,5.0,12.0]| +---+-------------+-----------------+
十、SQLTransformer
SQLTransformer 实现由 SQL 语句定义的转换。 目前,我们只支持像“SELECT ... FROM __THIS__ ...”这样的 SQL 语法,其中“__THIS__”表示输入数据集的基础表。 select 子句指定要在输出中显示的字段、常量和表达式,并且可以是 Spark SQL 支持的任何 select 子句。 用户还可以使用 Spark SQL 内置函数和 UDF 对这些选定的列进行操作。 例如,SQLTransformer 支持如下语句:
-
SELECT a, a + b AS a_b FROM __THIS__
-
SELECT a, SQRT(b) AS b_sqrt FROM __THIS__ where a > 5
-
SELECT a, b, SUM(c) AS c_sum FROM __THIS__ GROUP BY a, b
%spark // 特征转换 —— —— SQLTransformer import org.apache.spark.ml.feature.SQLTransformer val df = spark.createDataFrame( Seq((0, 1.0, 3.0), (2, 2.0, 5.0))).toDF("id", "v1", "v2") val sqlTrans = new SQLTransformer().setStatement( "SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__") sqlTrans.transform(df).show() 输出: +---+---+---+---+----+ | id| v1| v2| v3| v4| +---+---+---+---+----+ | 0|1.0|3.0|4.0| 3.0| | 2|2.0|5.0|7.0|10.0| +---+---+---+---+----+