scala Spark UnsupportedOperationException:空集合
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/27053036/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Spark UnsupportedOperationException: empty collection
提问by user3731845
Does anyone knows possible causes of this error while trying to execute spark mllib ALS using hands on lab provided by Databricks?
有没有人在尝试使用 Databricks 提供的动手实验室执行 spark mllib ALS 时知道此错误的可能原因?
14/11/20 23:33:38 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
14/11/20 23:33:39 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes
Got 27980 ratings from 24071 users on 4211 movies.
Training: 27989, validation: 0, test: 0
Exception in thread "main" java.lang.UnsupportedOperationException: empty collection
at org.apache.spark.rdd.RDD$$anonfun$reduce.apply(RDD.scala:806)
at org.apache.spark.rdd.RDD$$anonfun$reduce.apply(RDD.scala:806)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.reduce(RDD.scala:806)
at MovieLensALS$.computeRmse(MovieLensALS.scala:149)
at MovieLensALS$$anonfun$main$$anonfun$apply$mcVI$sp$$anonfun$apply$mcVD$sp.apply$mcVI$sp(MovieLensALS.scala:95)
at MovieLensALS$$anonfun$main$$anonfun$apply$mcVI$sp$$anonfun$apply$mcVD$sp.apply(MovieLensALS.scala:93)
at MovieLensALS$$anonfun$main$$anonfun$apply$mcVI$sp$$anonfun$apply$mcVD$sp.apply(MovieLensALS.scala:93)
at scala.collection.immutable.List.foreach(List.scala:318)
at MovieLensALS$$anonfun$main$$anonfun$apply$mcVI$sp.apply$mcVD$sp(MovieLensALS.scala:93)
at MovieLensALS$$anonfun$main$$anonfun$apply$mcVI$sp.apply(MovieLensALS.scala:93)
at MovieLensALS$$anonfun$main$$anonfun$apply$mcVI$sp.apply(MovieLensALS.scala:93)
at scala.collection.immutable.List.foreach(List.scala:318)
at MovieLensALS$$anonfun$main.apply$mcVI$sp(MovieLensALS.scala:93)
at MovieLensALS$$anonfun$main.apply(MovieLensALS.scala:93)
at MovieLensALS$$anonfun$main.apply(MovieLensALS.scala:93)
at scala.collection.immutable.List.foreach(List.scala:318)
at MovieLensALS$.main(MovieLensALS.scala:93)
at MovieLensALS.main(MovieLensALS.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:292)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
UPDATE: Sure thing! I am using this class. It is available in https://databricks-training.s3.amazonaws.com/movie-recommendation-with-mllib.htmland https://databricks-training.s3.amazonaws.com/getting-started.html#additional-required-download. Let me know if there is somehting else that could help
更新:当然!我正在使用这个类。它可在https://databricks-training.s3.amazonaws.com/movie-recommendation-with-mllib.html和https://databricks-training.s3.amazonaws.com/getting-started.html#additional-需要下载。让我知道是否还有其他可以帮助的东西
import java.io.File
import scala.io.Source
import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.rdd._`enter code here`
import org.apache.spark.mllib.recommendation.{ALS, Rating, MatrixFactorizationModel}
object MovieLensALS {
def main(args: Array[String]) {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
if (args.length != 2) {
println("Usage: /path/to/spark/bin/spark-submit --driver-memory 2g --class MovieLensALS " +
"target/scala-*/movielens-als-ssembly-*.jar movieLensHomeDir personalRatingsFile")
sys.exit(1)
}
// set up environment
val conf = new SparkConf()
.setAppName("MovieLensALS")
.set("spark.executor.memory", "2g")
val sc = new SparkContext(conf)
// load personal ratings
val myRatings = loadRatings(args(1))
val myRatingsRDD = sc.parallelize(myRatings, 1)
// load ratings and movie titles
val movieLensHomeDir = args(0)
val ratings = sc.textFile(new File(movieLensHomeDir, "ratings.dat").toString).map { line =>
val fields = line.split("::")
// format: (timestamp % 10, Rating(userId, movieId, rating))
(fields(3).toLong % 10, Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble))
}
val movies = sc.textFile(new File(movieLensHomeDir, "movies.dat").toString).map { line =>
val fields = line.split("::")
// format: (movieId, movieName)
(fields(0).toInt, fields(1))
}.collect().toMap
val numRatings = ratings.count()
val numUsers = ratings.map(_._2.user).distinct().count()
val numMovies = ratings.map(_._2.product).distinct().count()
println("Got " + numRatings + " ratings from "
+ numUsers + " users on " + numMovies + " movies.")
// split ratings into train (60%), validation (20%), and test (20%) based on the
// last digit of the timestamp, add myRatings to train, and cache them
val numPartitions = 4
val training = ratings.filter(x => x._1 < 6)
.values
.union(myRatingsRDD)
.repartition(numPartitions)
.cache()
val validation = ratings.filter(x => x._1 >= 6 && x._1 < 8)
.values
.repartition(numPartitions)
.cache()
val test = ratings.filter(x => x._1 >= 8).values.cache()
val numTraining = training.count()
val numValidation = validation.count()
val numTest = test.count()
println("Training: " + numTraining + ", validation: " + numValidation + ", test: " + numTest)
// train models and evaluate them on the validation set
val ranks = List(8, 12)
val lambdas = List(0.1, 10.0)
val numIters = List(10, 20)
var bestModel: Option[MatrixFactorizationModel] = None
var bestValidationRmse = Double.MaxValue
var bestRank = 0
var bestLambda = -1.0
var bestNumIter = -1
for (rank <- ranks; lambda <- lambdas; numIter <- numIters) {
val model = ALS.train(training, rank, numIter, lambda)
val validationRmse = computeRmse(model, validation, numValidation)
println("RMSE (validation) = " + validationRmse + " for the model trained with rank = "
+ rank + ", lambda = " + lambda + ", and numIter = " + numIter + ".")
if (validationRmse < bestValidationRmse) {
bestModel = Some(model)
bestValidationRmse = validationRmse
bestRank = rank
bestLambda = lambda
bestNumIter = numIter
}
}
// evaluate the best model on the test set
val testRmse = computeRmse(bestModel.get, test, numTest)
println("The best model was trained with rank = " + bestRank + " and lambda = " + bestLambda
+ ", and numIter = " + bestNumIter + ", and its RMSE on the test set is " + testRmse + ".")
// create a naive baseline and compare it with the best model
val meanRating = training.union(validation).map(_.rating).mean
val baselineRmse =
math.sqrt(test.map(x => (meanRating - x.rating) * (meanRating - x.rating)).mean)
val improvement = (baselineRmse - testRmse) / baselineRmse * 100
println("The best model improves the baseline by " + "%1.2f".format(improvement) + "%.")
// make personalized recommendations
val myRatedMovieIds = myRatings.map(_.product).toSet
val candidates = sc.parallelize(movies.keys.filter(!myRatedMovieIds.contains(_)).toSeq)
val recommendations = bestModel.get
.predict(candidates.map((0, _)))
.collect()
.sortBy(- _.rating)
.take(50)
var i = 1
println("Movies recommended for you:")
recommendations.foreach { r =>
println("%2d".format(i) + ": " + movies(r.product))
i += 1
}
// clean up
sc.stop()
}
/** Compute RMSE (Root Mean Squared Error). */
def computeRmse(model: MatrixFactorizationModel, data: RDD[Rating], n: Long): Double = {
val predictions: RDD[Rating] = model.predict(data.map(x => (x.user, x.product)))
val predictionsAndRatings = predictions.map(x => ((x.user, x.product), x.rating))
.join(data.map(x => ((x.user, x.product), x.rating)))
.values
math.sqrt(predictionsAndRatings.map(x => (x._1 - x._2) * (x._1 - x._2)).reduce(_ + _) / n)
}
/** Load ratings from file. */
def loadRatings(path: String): Seq[Rating] = {
val lines = Source.fromFile(path).getLines()
val ratings = lines.map { line =>
val fields = line.split("::")
Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble)
}.filter(_.rating > 0.0)
if (ratings.isEmpty) {
sys.error("No ratings provided.")
} else {
ratings.toSeq
}
}
}
回答by asu
Probably it's because of some filters you (or computeRmse method) are using, so the reduce method gets called on an empty collection/RDD, so the "empty collection" is thrown. Try to double check the filters or computeRmse() function.
可能是因为您(或 computeRmse 方法)正在使用某些过滤器,因此在空集合/RDD 上调用了 reduce 方法,因此抛出了“空集合”。尝试仔细检查过滤器或 computeRmse() 函数。
回答by chelBert
I ran into the same problem working with the same example. The problem is that the training data I was using wasn't large enough and didn't have enough repeated values. The ALS model can only predict pairs of user, product Ids that were present in the training data. (It is some what different from other machine learning algorithms in that way), so if each pair in the validation set, contains one ID which wasn't in the training set, the prediction RDD will be null (since it can't predict any of those values) and the reduce transformation in the rmse method will throw this exception. To avoid this you should:
我在使用同一个例子时遇到了同样的问题。问题是我使用的训练数据不够大并且没有足够的重复值。ALS 模型只能预测训练数据中存在的用户、产品 ID 对。(这在某种程度上与其他机器学习算法不同),因此如果验证集中的每一对都包含一个不在训练集中的 ID,则预测 RDD 将为空(因为它无法预测)这些值中的任何一个)并且 rmse 方法中的 reduce 转换将抛出此异常。为避免这种情况,您应该:
A) not use this algorithm without sufficient training data and
A) 在没有足够训练数据的情况下不要使用该算法,并且
B) check before entering the "finding the best model" loop that your validation set will work on the training this training set.
B) 在进入“寻找最佳模型”循环之前检查您的验证集将用于训练该训练集。
Lastly, if you are productizing this algorithm, make sure that you don't use the best model returned by this method, because it is likely not to have all your user and product ids. If thats the case, then you are restricting the new users and products that you can predict on. What I would recommend would be to use this logic to discern the correct training parameters and then, using those parameters, train the model on all the data and use that.
最后,如果您要生产此算法,请确保您不使用此方法返回的最佳模型,因为它可能没有您所有的用户和产品 ID。如果是这样,那么您就限制了您可以预测的新用户和产品。我推荐的是使用这个逻辑来辨别正确的训练参数,然后使用这些参数,在所有数据上训练模型并使用它。
回答by radek1st
I came across the exact same exception. In my case, it was a bug in the code which resulted in the actual ratings RDD to be of size zero :) By passing empty ratings RDD to ALS.train I definitely deserved to get UnsupportedOperationException: empty collection
我遇到了完全相同的异常。就我而言,这是代码中的一个错误,导致实际评级 RDD 的大小为零:) 通过将空评级 RDD 传递给 ALS.train 我绝对应该得到UnsupportedOperationException: empty collection

