如何在带有 Spark 的 Scala 中使用 countDistinct?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/33500816/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How to use countDistinct in Scala with Spark?
提问by Adam H
I've tried to use countDistinctfunction which should be available in Spark 1.5 according to DataBrick's blog. However, I got the following exception:
根据DataBrick 的博客,我尝试使用应该在 Spark 1.5 中可用的countDistinct函数。但是,我得到以下异常:
Exception in thread "main" org.apache.spark.sql.AnalysisException: undefined function countDistinct;
I've found that on Spark developers' mail listthey suggest using countand distinctfunctions to get the same result which should be produced by countDistinct:
我发现在Spark 开发人员的邮件列表中,他们建议使用count和distinct函数来获得应由countDistinct生成的相同结果:
count(distinct <columnName>)
// Instead
countDistinct(<columnName>)
Because I build aggregation expressions dynamically from the list of the names of aggregation functions I'd prefer to don't have any special cases which require different treating.
因为我从聚合函数的名称列表中动态构建聚合表达式,所以我不希望有任何需要不同处理的特殊情况。
So, is it possible to unify it by:
那么,是否可以通过以下方式统一它:
- registering new UDAF which will be an alias for count(distinct columnName)
registering manually already implemented in Spark CountDistinct function which is probably one from following import:
import org.apache.spark.sql.catalyst.expressions.{CountDistinctFunction, CountDistinct}
or do it in any other way?
- 注册新的 UDAF,它将成为count(distinct columnName)的别名
手动注册已经在 Spark CountDistinct 函数中实现,这可能是以下导入中的一个:
导入 org.apache.spark.sql.catalyst.expressions.{CountDistinctFunction, CountDistinct}
或以任何其他方式进行?
EDIT: Example (with removed some local references and unnecessary code):
编辑:示例(删除了一些本地引用和不必要的代码):
import org.apache.spark.SparkContext
import org.apache.spark.sql.{Column, SQLContext, DataFrame}
import org.apache.spark.sql.functions._
import scala.collection.mutable.ListBuffer
class Flattener(sc: SparkContext) {
val sqlContext = new SQLContext(sc)
def flatTable(data: DataFrame, groupField: String): DataFrame = {
val flatteningExpressions = data.columns.zip(TypeRecognizer.getTypes(data)).
flatMap(x => getFlatteningExpressions(x._1, x._2)).toList
data.groupBy(groupField).agg (
expr(s"count($groupField) as groupSize"),
flatteningExpressions:_*
)
}
private def getFlatteningExpressions(fieldName: String, fieldType: DType): List[Column] = {
val aggFuncs = getAggregationFunctons(fieldType)
aggFuncs.map(f => expr(s"$f($fieldName) as ${fieldName}_$f"))
}
private def getAggregationFunctons(fieldType: DType): List[String] = {
val aggFuncs = new ListBuffer[String]()
if(fieldType == DType.NUMERIC) {
aggFuncs += ("avg", "min", "max")
}
if(fieldType == DType.CATEGORY) {
aggFuncs += "countDistinct"
}
aggFuncs.toList
}
}
采纳答案by Adam H
countDistinct can be used in two different forms:
countDistinct 可以以两种不同的形式使用:
df.groupBy("A").agg(expr("count(distinct B)")
or
或者
df.groupBy("A").agg(countDistinct("B"))
However, neither of these methods work when you want to use them on the same column with your custom UDAF (implemented as UserDefinedAggregateFunction in Spark 1.5):
但是,当您希望将它们与自定义 UDAF(在 Spark 1.5 中作为 UserDefinedAggregateFunction 实现)在同一列上使用它们时,这两种方法都不起作用:
// Assume that we have already implemented and registered StdDev UDAF
df.groupBy("A").agg(countDistinct("B"), expr("StdDev(B)"))
// Will cause
Exception in thread "main" org.apache.spark.sql.AnalysisException: StdDev is implemented based on the new Aggregate Function interface and it cannot be used with functions implemented based on the old Aggregate Function interface.;
Due to these limitation it looks that the most reasonable is implementing countDistinct as a UDAF what should allow to treat all functions in the same way as well as use countDistinct along with other UDAFs.
由于这些限制,看起来最合理的是将 countDistinct 实现为 UDAF,它应该允许以相同的方式处理所有函数以及将 countDistinct 与其他 UDAF 一起使用。
The example implementation can look like this:
示例实现可能如下所示:
import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
class CountDistinct extends UserDefinedAggregateFunction{
override def inputSchema: StructType = StructType(StructField("value", StringType) :: Nil)
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
buffer(0) = (buffer.getSeq[String](0).toSet + input.getString(0)).toSeq
}
override def bufferSchema: StructType = StructType(
StructField("items", ArrayType(StringType, true)) :: Nil
)
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = (buffer1.getSeq[String](0).toSet ++ buffer2.getSeq[String](0).toSet).toSeq
}
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = Seq[String]()
}
override def deterministic: Boolean = true
override def evaluate(buffer: Row): Any = {
buffer.getSeq[String](0).length
}
override def dataType: DataType = IntegerType
}
回答by alghimo
Not sure if I really understood your problem, but this is an example for the countDistinct aggregated function:
不确定我是否真的理解您的问题,但这是 countDistinct 聚合函数的示例:
val values = Array((1, 2), (1, 3), (2, 2), (1, 2))
val myDf = sc.parallelize(values).toDF("id", "foo")
import org.apache.spark.sql.functions.countDistinct
myDf.groupBy('id).agg(countDistinct('foo) as 'distinctFoo) show
/**
+---+-------------------+
| id|COUNT(DISTINCT foo)|
+---+-------------------+
| 1| 2|
| 2| 1|
+---+-------------------+
*/

