scala 如何在 Spark ML 中为分类创建正确的数据框
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/31028806/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How to create correct data frame for classification in Spark ML
提问by Dusan Grubjesic
I am trying to run random forest classification by using Spark ML apibut I am having issues with creating right data frame input into pipeline.
我正在尝试使用Spark ML api运行随机森林分类,但在将正确的数据帧输入创建到管道时遇到问题。
Here is sample data:
这是示例数据:
age,hours_per_week,education,sex,salaryRange
38,40,"hs-grad","male","A"
28,40,"bachelors","female","A"
52,45,"hs-grad","male","B"
31,50,"masters","female","B"
42,40,"bachelors","male","B"
ageand hours_per_weekare integers while other features including label salaryRangeare categorical (String)
age和hours_per_week是整数,而其他特征包括标签salaryRange是分类的(字符串)
Loading this csv file (lets call it sample.csv) can be done by Spark csv librarylike this:
加载这个 csv 文件(我们称之为 sample.csv)可以通过Spark csv 库来完成,如下所示:
val data = sqlContext.csvFile("/home/dusan/sample.csv")
By default all columns are imported as string so we need to change "age" and "hours_per_week" to Int:
默认情况下,所有列都作为字符串导入,因此我们需要将“age”和“hours_per_week”更改为 Int:
val toInt = udf[Int, String]( _.toInt)
val dataFixed = data.withColumn("age", toInt(data("age"))).withColumn("hours_per_week",toInt(data("hours_per_week")))
Just to check how schema looks now:
只是为了检查架构现在的样子:
scala> dataFixed.printSchema
root
|-- age: integer (nullable = true)
|-- hours_per_week: integer (nullable = true)
|-- education: string (nullable = true)
|-- sex: string (nullable = true)
|-- salaryRange: string (nullable = true)
Then lets set the cross validator and pipeline:
然后让我们设置交叉验证器和管道:
val rf = new RandomForestClassifier()
val pipeline = new Pipeline().setStages(Array(rf))
val cv = new CrossValidator().setNumFolds(10).setEstimator(pipeline).setEvaluator(new BinaryClassificationEvaluator)
Error shows up when running this line:
运行此行时出现错误:
val cmModel = cv.fit(dataFixed)
java.lang.IllegalArgumentException: Field "features" does not exist.
java.lang.IllegalArgumentException:字段“功能”不存在。
It is possible to set label column and feature column in RandomForestClassifier ,however I have 4 columns as predictors (features) not only one.
可以在 RandomForestClassifier 中设置标签列和特征列,但是我有 4 列作为预测变量(特征)而不仅仅是一个。
How I should organize my data frame so it has label and features columns organized correctly?
我应该如何组织我的数据框,以便正确组织标签和特征列?
For your convenience here is full code :
为了您的方便,这里是完整的代码:
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.tuning.CrossValidator
import org.apache.spark.ml.Pipeline
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.mllib.linalg.{Vector, Vectors}
object SampleClassification {
def main(args: Array[String]): Unit = {
//set spark context
val conf = new SparkConf().setAppName("Simple Application").setMaster("local");
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
import com.databricks.spark.csv._
//load data by using databricks "Spark CSV Library"
val data = sqlContext.csvFile("/home/dusan/sample.csv")
//by default all columns are imported as string so we need to change "age" and "hours_per_week" to Int
val toInt = udf[Int, String]( _.toInt)
val dataFixed = data.withColumn("age", toInt(data("age"))).withColumn("hours_per_week",toInt(data("hours_per_week")))
val rf = new RandomForestClassifier()
val pipeline = new Pipeline().setStages(Array(rf))
val cv = new CrossValidator().setNumFolds(10).setEstimator(pipeline).setEvaluator(new BinaryClassificationEvaluator)
// this fails with error
//java.lang.IllegalArgumentException: Field "features" does not exist.
val cmModel = cv.fit(dataFixed)
}
}
Thanks for help!
感谢帮助!
采纳答案by tuxdna
You simply need to make sure that you have a "features"column in your dataframe that is of type VectorUDFas show below:
您只需要确保"features"您的数据框中有一列,其类型VectorUDF如下所示:
scala> val df2 = dataFixed.withColumnRenamed("age", "features")
df2: org.apache.spark.sql.DataFrame = [features: int, hours_per_week: int, education: string, sex: string, salaryRange: string]
scala> val cmModel = cv.fit(df2)
java.lang.IllegalArgumentException: requirement failed: Column features must be of type org.apache.spark.mllib.linalg.VectorUDT@1eef but was actually IntegerType.
at scala.Predef$.require(Predef.scala:233)
at org.apache.spark.ml.util.SchemaUtils$.checkColumnType(SchemaUtils.scala:37)
at org.apache.spark.ml.PredictorParams$class.validateAndTransformSchema(Predictor.scala:50)
at org.apache.spark.ml.Predictor.validateAndTransformSchema(Predictor.scala:71)
at org.apache.spark.ml.Predictor.transformSchema(Predictor.scala:118)
at org.apache.spark.ml.Pipeline$$anonfun$transformSchema.apply(Pipeline.scala:164)
at org.apache.spark.ml.Pipeline$$anonfun$transformSchema.apply(Pipeline.scala:164)
at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60)
at scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:108)
at org.apache.spark.ml.Pipeline.transformSchema(Pipeline.scala:164)
at org.apache.spark.ml.tuning.CrossValidator.transformSchema(CrossValidator.scala:142)
at org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:59)
at org.apache.spark.ml.tuning.CrossValidator.fit(CrossValidator.scala:107)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:67)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:72)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:74)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:76)
EDIT1
编辑1
Essentially there need to be two fields in your data frame "features" for feature vector and "label" for instance labels. Instance must be of type Double.
本质上,您的数据框中需要有两个字段,用于特征向量的“特征”和用于实例标签的“标签”。实例的类型必须为Double。
To create a "features" fields with Vectortype first create a udfas show below:
要创建一个Vector类型为“功能”的字段,首先创建一个udf如下所示的:
val toVec4 = udf[Vector, Int, Int, String, String] { (a,b,c,d) =>
val e3 = c match {
case "hs-grad" => 0
case "bachelors" => 1
case "masters" => 2
}
val e4 = d match {case "male" => 0 case "female" => 1}
Vectors.dense(a, b, e3, e4)
}
Now to also encode the "label" field, create another udfas shown below:
现在还要对“标签”字段进行编码,创建另一个udf如下所示:
val encodeLabel = udf[Double, String]( _ match { case "A" => 0.0 case "B" => 1.0} )
Now we transform original dataframe using these two udf:
现在我们使用这两个转换原始数据帧udf:
val df = dataFixed.withColumn(
"features",
toVec4(
dataFixed("age"),
dataFixed("hours_per_week"),
dataFixed("education"),
dataFixed("sex")
)
).withColumn("label", encodeLabel(dataFixed("salaryRange"))).select("features", "label")
Note that there can be extra columns / fields present in the dataframe, but in this case I have selected only featuresand label:
请注意,数据框中可能存在额外的列/字段,但在这种情况下,我只选择了features和label:
scala> df.show()
+-------------------+-----+
| features|label|
+-------------------+-----+
|[38.0,40.0,0.0,0.0]| 0.0|
|[28.0,40.0,1.0,1.0]| 0.0|
|[52.0,45.0,0.0,0.0]| 1.0|
|[31.0,50.0,2.0,1.0]| 1.0|
|[42.0,40.0,1.0,0.0]| 1.0|
+-------------------+-----+
Now its upto you to set correct parameters for your learning algorithm to make it work.
现在由您来为您的学习算法设置正确的参数以使其工作。
回答by u2130573
As of Spark 1.4, you can use Transformer org.apache.spark.ml.feature.VectorAssembler. Just provide column names you want to be features.
从 Spark 1.4 开始,您可以使用 Transformer org.apache.spark.ml.feature.VectorAssembler。只需提供您想要成为特征的列名。
val assembler = new VectorAssembler()
.setInputCols(Array("col1", "col2", "col3"))
.setOutputCol("features")
and add it to your pipeline.
并将其添加到您的管道中。
回答by Adriano Almeida
According to spark documentation on mllib - random trees, seems to me that you should define the features map that you are using and the points should be a labeledpoint.
根据关于 mllib - 随机树的 spark 文档,在我看来,您应该定义您正在使用的特征图,并且点应该是标记点。
This will tell the algorithm which column should be used as prediction and which ones are the features.
这将告诉算法哪一列应该用作预测,哪些是特征。
https://spark.apache.org/docs/latest/mllib-decision-tree.html
https://spark.apache.org/docs/latest/mllib-decision-tree.html

