scala Spark 数据帧:根据另一列的值提取一列

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/33638325/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 07:47:36  来源:igfitidea点击:

Spark dataframes: Extract a column based on the value of another column

scalaapache-sparkdataframeapache-spark-sql

提问by TomTom101

I have a dataframe with transactions with a joined price list:

我有一个包含价格表的交易数据框:

+----------+----------+------+-------+-------+
|   paid   | currency | EUR  |  USD  |  GBP  |
+----------+----------+------+-------+-------+
|   49.5   |   EUR    | 99   |  79   |  69   |
+----------+----------+------+-------+-------+

A customer has paid 49.5 in EUR, as shown in the "currency" column. I now want to compare that paid price with the price from the price list.

客户已支付 49.5 欧元,如“货币”列中所示。我现在想将支付的价格与价目表中的价格进行比较。

Therefor I need to access the correct column based on the value of "currency" like so:

因此,我需要根据“货币”的值访问正确的列,如下所示:

df.withColumn("saved", df.col(df.col($"currency")) - df.col("paid"))

which I hoped would become

我希望会成为

df.withColumn("saved", df.col("EUR") - df.col("paid"))

This fails, however. I tried all things I could image, including and UDF, getting nowhere.

然而,这失败了。我尝试了所有可以想象的东西,包括 UDF,但一无所获。

I guess there is some elegant solution for this? Can somebody help out here?

我想有一些优雅的解决方案吗?有人可以帮忙吗?

回答by zero323

Assuming that the column names match values in the currencycolumn:

假设列名与列中的值匹配currency

import org.apache.spark.sql.functions.{lit, col, coalesce}
import org.apache.spark.sql.Column 

// Dummy data
val df = sc.parallelize(Seq(
  (49.5, "EUR", 99, 79, 69), (100.0, "GBP", 80, 120, 50)
)).toDF("paid", "currency", "EUR", "USD", "GBP")

// A list of available currencies 
val currencies: List[String] = List("EUR", "USD", "GBP")

// Select listed value
val listedPrice: Column = coalesce(
  currencies.map(c => when($"currency" === c, col(c)).otherwise(lit(null))): _*)

df.select($"*", (listedPrice - $"paid").alias("difference")).show

// +-----+--------+---+---+---+----------+
// | paid|currency|EUR|USD|GBP|difference|
// +-----+--------+---+---+---+----------+
// | 49.5|     EUR| 99| 79| 69|      49.5|
// |100.0|     GBP| 80|120| 50|     -50.0|
// +-----+--------+---+---+---+----------+

with SQL equivalent of listedPriceexpression being something like this:

与 SQL 等价的listedPrice表达式是这样的:

COALESCE(
  CASE WHEN (currency = 'EUR') THEN EUR ELSE null,
  CASE WHEN (currency = 'USD') THEN USD ELSE null,
  CASE WHEN (currency = 'GBP') THEN GBP ELSE null
)

Alternative using foldLeft:

替代使用foldLeft

import org.apache.spark.sql.functions.when

val listedPriceViaFold = currencies.foldLeft(
  lit(null))((acc, c) => when($"currency" === c, col(c)).otherwise(acc))

df.select($"*", (listedPriceViaFold - $"paid").alias("difference")).show

// +-----+--------+---+---+---+----------+
// | paid|currency|EUR|USD|GBP|difference|
// +-----+--------+---+---+---+----------+
// | 49.5|     EUR| 99| 79| 69|      49.5|
// |100.0|     GBP| 80|120| 50|     -50.0|
// +-----+--------+---+---+---+----------+

where listedPriceViaFoldtranslates to following SQL:

wherelistedPriceViaFold转换为以下 SQL:

CASE
  WHEN (currency = 'GBP') THEN GBP
  ELSE CASE
    WHEN (currency = 'USD') THEN USD
    ELSE CASE
      WHEN (currency = 'EUR') THEN EUR
      ELSE null

Unfortunately I am not aware of any built-in functions which could express directly SQL like this

不幸的是,我不知道任何可以像这样直接表达 SQL 的内置函数

CASE currency
    WHEN 'EUR' THEN EUR
    WHEN 'USD' THEN USD
    WHEN 'GBP' THEN GBP
    ELSE null
END

but you can use this construct in raw SQL.

但是您可以在原始 SQL 中使用此构造。

It my assumption is not true you can simply add mapping between column name and a value in the currencycolumn.

我的假设不正确,您可以简单地在列名和列中的值之间添加映射currency

Edit:

编辑

Another option, which could be efficient if source supports predicate pushdown and efficient column pruning, is to subset by currency and union:

如果源支持谓词下推和有效的列修剪,另一个可能是有效的选择是按货币和联合进行子集:

currencies.map(
  // for each currency filter and add difference
  c => df.where($"currency" === c).withColumn("difference", $"paid" - col(c))
).reduce((df1, df2) => df1.unionAll(df2)) // Union

It is equivalent to SQL like this:

它相当于这样的 SQL:

SELECT *,  EUR - paid AS difference FROM df WHERE currency = 'EUR'
UNION ALL
SELECT *,  USD - paid AS difference FROM df WHERE currency = 'USD'
UNION ALL
SELECT *,  GBP - paid AS difference FROM df WHERE currency = 'GBP'

回答by mehmetminanc

I can't think of a way doing this with DataFrames, and I doubt that there is simple way, but if you take that table into an RDD:

我想不出用DataFrames来做这件事的方法,我怀疑有没有简单的方法,但是如果你把那个表变成一个RDD

// On top of my head, warn if wrong.
// Would be more elegant with match .. case 
def d(l: (Int, String, Int, Int, Int)): Int = {
  if(l._2 == "EUR")
    l._3 - l._1
  else if (l._2 == "USD")
    l._4 - l._1
  else 
    l._5 -l._1
}
val rdd = df.rdd
val diff = rdd.map(r => (r, r(d)))

Will most likely raise type errors, I hope you can navigate around those.

很可能会引发类型错误,我希望您可以浏览这些错误。