scala 如何根据条件(组中的值)更新列?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/40692025/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How to update column based on a condition (a value in a group)?
提问by senthil kumar p
I have the following df:
我有以下 df:
+---+----+-----+
|sno|dept|color|
+---+----+-----+
| 1| fn| red|
| 2| fn| blue|
| 3| fn|green|
+---+----+-----+
If any of the color column values is red, then I all values of the color column should be updated to be red, as below:
如果任何颜色列值为red,则颜色列的所有值都应更新为red,如下所示:
+---+----+-----+
|sno|dept|color|
+---+----+-----+
| 1| fn| red|
| 2| fn| red|
| 3| fn| red|
+---+----+-----+
I could not figure it out. Please help; I have tried following code:
我想不通。请帮忙; 我试过以下代码:
val gp=jdbcDF.filter($"dept".contains("fn"))
//.withColumn("newone",when($"dept"==="fn","RED").otherwise("NULL"))
gp.show()
gp.map(
row=>{
val row1=row.getAs[String](1)
var row2=row.getAs[String](2)
val make=if(row1 =="fn") row2="red"
Row(row(0),row(1),make)
}
).collect().foreach(println)
采纳答案by Jacek Laskowski
Given:
鉴于:
val df = Seq(
(1, "fn", "red"),
(2, "fn", "blue"),
(3, "fn", "green"),
(4, "aa", "blue"),
(5, "aa", "green"),
(6, "bb", "red"),
(7, "bb", "red"),
(8, "aa", "blue")
).toDF("id", "fn", "color")
Do the calculation:
进行计算:
val redOrNot = df.groupBy("fn")
.agg(collect_set('color) as "values")
.withColumn("hasRed", array_contains('values, "red"))
// gives null for no option
val colorPicker = when('hasRed, "red")
val result = df.join(redOrNot, "fn")
.withColumn("resultColor", colorPicker)
.withColumn("color", coalesce('resultColor, 'color)) // skips nulls that leads to the answer
.select('id, 'fn, 'color)
The resultlooks as follows (that seems to be an answer):
在result如下的外观(这似乎是一个答案):
scala> result.show
+---+---+-----+
| id| fn|color|
+---+---+-----+
| 1| fn| red|
| 2| fn| red|
| 3| fn| red|
| 4| aa| blue|
| 5| aa|green|
| 6| bb| red|
| 7| bb| red|
| 8| aa| blue|
+---+---+-----+
You can chain whenoperators and have a default value with otherwise. Consult the scaladoc of whenoperator.
您可以链接when运算符并使用otherwise. 查阅operator的scaladocwhen。
I think you could do something very similar (and perhaps more efficient) using windowed operators or user-defined aggregate functions (UDAF), but...well...don't currently know how to do it. Leaving the comment here to inspire others ;-)
我认为您可以使用窗口运算符或用户定义的聚合函数 (UDAF) 来做一些非常相似的(也许更有效)的事情,但是......好吧......目前不知道如何去做。在这里留下评论以激励他人;-)
p.s. Learnt a lot! Thanks for the idea!
ps 学到了很多!谢谢你的主意!
回答by e9f20079
Efficient solution which doesn't require expensive grouping:
不需要昂贵分组的高效解决方案:
// All groups with `red`
df.where($"color" === "red").select($"fn".alias("fn_")).distinct
// Join with input
.join(df.as("df"), $"fn" === $"fn_", "rightouter")
// Replace `color`
.withColumn("color", when($"fn_"isNull, $"color").otherwise(lit("red")))
.drop("fn_")
回答by Daniel Shields
You are conditionally updating the DataFrame if it satisfies a certain property. In this case the property is "the color column contains 'red'". The idiomatic way to express this is to filter with the desired predicate and then determine whether any rows satisfy it. There is no need for a join.
如果 DataFrame 满足某个属性,则有条件地更新它。在这种情况下,属性是“颜色列包含‘红色’”。表达这一点的惯用方法是使用所需的谓词进行过滤,然后确定是否有任何行满足它。不需要加入。
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.DataFrame
def makeAllRedIfAnyAreRed(df: DataFrame) = {
val containsRed = df.filter(df("color") === "red").count() > 0
if (containsRed) df.withColumn("color", lit("red")) else df
}
回答by mrsrinivas
As there could be few rows in filtered dataframe I'm adding solution with isin()and .withColumn()combination.
由于过滤后的数据框中可能只有几行,我正在添加解决方案isin()和.withColumn()组合。
Sample DataFrame
示例数据帧
val df = Seq(
(1, "fn", "red"),
(2, "fn", "blue"),
(3, "fn", "green"),
(4, "aa", "blue"),
(5, "aa", "green"),
(6, "bb", "red"),
(7, "bb", "red"),
(8, "aa", "blue")
).toDF("id", "dept", "color")
Now Let's pick only depts which have at least one redcolorrow and place it in broadcastvariable like below.
现在让我们只选择dept至少有一个红色color行的s并将其放入broadcast变量中,如下所示。
val depts = sc.broadcast(df.filter($"color" === "red").select(collect_set("dept")).first.getSeq[String](0)))
Update redcolor for filtered deptsrecords.
更新过滤记录的红色depts。
isin()takes a vararg so convert list to vararg (depts.value:_*)
isin()需要一个可变参数,所以将列表转换为可变参数 (depts.value:_*)
//creating new column by giving diff name (clr) to see the diff
val result = df.withColumn("clr", when($"dept".isin(depts.value:_*),lit("red"))
.otherwise($"color"))
result.show()
+---+----+-----+-----+
| id|dept|color| clr|
+---+----+-----+-----+
| 1| fn| red| red|
| 2| fn| blue| red|
| 3| fn|green| red|
| 4| aa| blue| blue|
| 5| aa|green|green|
| 6| bb| red| red|
| 7| bb| red| red|
| 8| aa| blue| blue|
+---+----+-----+-----+
回答by Vamshavardhan Reddy
Spark 2.2.0: Sample Dataframe ( taken from above examples)
Spark 2.2.0:示例数据帧(取自上述示例)
val df = Seq(
(1, "fn", "red"),
(2, "fn", "blue"),
(3, "fn", "green"),
(4, "aa", "blue"),
(5, "aa", "green"),
(6, "bb", "red"),
(7, "bb", "red"),
(8, "aa", "blue")
).toDF("id", "dept", "color")
created a UDF to perform the update by checking the condition.
创建了一个 UDF 以通过检查条件来执行更新。
val replace_val = udf((x: String,y:String) => if (Option(x).getOrElse("").equalsIgnoreCase("fn")&&(!y.equalsIgnoreCase("red"))) "red" else y)
val final_df = df.withColumn("color", replace_val($"dept",$"color"))
final_df.show()
output:
输出:
spark 1.6:
火花1.6:
val conf = new SparkConf().setMaster("local").setAppName("My app")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
// For implicit conversions like converting RDDs to DataFrames
val df = sc.parallelize(Seq(
(1, "fn", "red"),
(2, "fn", "blue"),
(3, "fn", "green"),
(4, "aa", "blue"),
(5, "aa", "green"),
(6, "bb", "red"),
(7, "bb", "red"),
(8, "aa", "blue")
) ).toDF("id","dept","color")
val replace_val = udf((x: String,y:String) => if (Option(x).getOrElse("").equalsIgnoreCase("fn")&&(!y.equalsIgnoreCase("red"))) "red" else y)
val final_df = df.withColumn("color", replace_val($"dept",$"color"))
final_df.show()


