scala Spark 从一行中提取值

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/33007840/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 07:41:57  来源:igfitidea点击:

Spark extracting values from a Row

scalaapache-sparkapache-spark-sql

提问by Sam

I have the following dataframe

我有以下数据框

val transactions_with_counts = sqlContext.sql(
  """SELECT user_id AS user_id, category_id AS category_id,
  COUNT(category_id) FROM transactions GROUP BY user_id, category_id""")

I'm trying to convert the rows to Rating objects but since x(0) returns an array this fails

我正在尝试将行转换为 Rating 对象,但由于 x(0) 返回一个数组,因此失败

val ratings = transactions_with_counts
  .map(x => Rating(x(0).toInt, x(1).toInt, x(2).toInt))

error: value toInt is not a member of Any

错误:值 toInt 不是 Any 的成员

回答by zero323

Lets start with some dummy data:

让我们从一些虚拟数据开始:

val transactions = Seq((1, 2), (1, 4), (2, 3)).toDF("user_id", "category_id")

val transactions_with_counts = transactions
  .groupBy($"user_id", $"category_id")
  .count

transactions_with_counts.printSchema

// root
// |-- user_id: integer (nullable = false)
// |-- category_id: integer (nullable = false)
// |-- count: long (nullable = false)

There are a few ways to access Rowvalues and keep expected types:

有几种方法可以访问Row值并保留预期类型:

  1. Pattern matching

    import org.apache.spark.sql.Row
    
    transactions_with_counts.map{
      case Row(user_id: Int, category_id: Int, rating: Long) =>
        Rating(user_id, category_id, rating)
    } 
    
  2. Typed get*methods like getInt, getLong:

    transactions_with_counts.map(
      r => Rating(r.getInt(0), r.getInt(1), r.getLong(2))
    )
    
  3. getAsmethod which can use both names and indices:

    transactions_with_counts.map(r => Rating(
      r.getAs[Int]("user_id"), r.getAs[Int]("category_id"), r.getAs[Long](2)
    ))
    

    It can be used to properly extract user defined types, including mllib.linalg.Vector. Obviously accessing by name requires a schema.

  4. Converting to statically typed Dataset(Spark 1.6+ / 2.0+):

    transactions_with_counts.as[(Int, Int, Long)]
    
  1. 模式匹配

    import org.apache.spark.sql.Row
    
    transactions_with_counts.map{
      case Row(user_id: Int, category_id: Int, rating: Long) =>
        Rating(user_id, category_id, rating)
    } 
    
  2. 类型get*方法,如getInt, getLong

    transactions_with_counts.map(
      r => Rating(r.getInt(0), r.getInt(1), r.getLong(2))
    )
    
  3. getAs可以使用名称和索引的方法:

    transactions_with_counts.map(r => Rating(
      r.getAs[Int]("user_id"), r.getAs[Int]("category_id"), r.getAs[Long](2)
    ))
    

    它可用于正确提取用户定义的类型,包括mllib.linalg.Vector. 显然,按名称访问需要一个模式。

  4. 转换为静态类型Dataset(Spark 1.6+ / 2.0+):

    transactions_with_counts.as[(Int, Int, Long)]
    

回答by user-asterix

Using Datasets you can define Ratings as follows:

使用数据集,您可以按如下方式定义评级:

case class Rating(user_id: Int, category_id:Int, count:Long)

The Rating class here has a column name 'count' instead of 'rating' as zero323 suggested. Thus the rating variable is assigned as follows:

这里的 Rating 类有一个列名“count”,而不是像 zero323 建议的那样“rating”。因此,评级变量分配如下:

val transactions_with_counts = transactions.groupBy($"user_id", $"category_id").count

val rating = transactions_with_counts.as[Rating]

This way you will not run into run-time errors in Spark because your Rating class column name is identical to the 'count' column name generated by Spark on run-time.

这样您就不会在 Spark 中遇到运行时错误,因为您的 Rating 类列名称与 Spark 在运行时生成的“计数”列名称相同。

回答by Sarath Avanavu

To access a value of a row of Dataframe, you need to use rdd.collectof Dataframewith for loop.

要访问的行的值数据框,你需要使用rdd.collect数据帧与循环。

Consider your Dataframe looks like below.

考虑您的 Dataframe 如下所示。

val df = Seq(
      (1,"James"),    
      (2,"Albert"),
      (3,"Pete")).toDF("user_id","name")

Use rdd.collecton top of your Dataframe. The rowvariable will contain each row of Dataframeof rddrow type. To get each element from a row, use row.mkString(",")which will contain value of each row in comma separated values. Using splitfunction (inbuilt function) you can access each column value of rddrow with index.

使用rdd.collect您的顶部数据帧。该row变量将包含行类型的Dataframe的每一行rdd。要从一行中获取每个元素,请使用row.mkString(",")which 将以逗号分隔的值包含每行的值。使用split函数(内置函数),您可以rdd使用索引访问行的每一列值。

for (row <- df.rdd.collect)
{   
    var user_id = row.mkString(",").split(",")(0)
    var category_id = row.mkString(",").split(",")(1)       
}

The above code looks little more bigger when compared to dataframe.foreachloops, but you will get more control over your logic by using the above code.

dataframe.foreach循环相比,上面的代码看起来要大一些,但是通过使用上面的代码,您可以更好地控制您的逻辑。