scala Apache Spark:获取每个分区的记录数

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/46032320/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 09:25:28  来源:igfitidea点击:

Apache Spark: Get number of records per partition

scalaapache-sparkhadoopapache-spark-sqlpartitioning

提问by nilesh1212

I want to check how can we get information about each partition such as total no. of records in each partition on driver side when Spark job is submitted with deploy mode as a yarn cluster in order to log or print on the console.

我想检查我们如何获取有关每个分区的信息,例如总数。当 Spark 作业以部署模式作为纱线集群提交时,驱动程序端每个分区中的记录,以便在控制台上记录或打印。

回答by Alper t. Turker

I'd use built-in function. It should be as efficient as it gets:

我会使用内置函数。它应该尽可能高效:

import org.apache.spark.sql.functions.spark_partition_id

df.groupBy(spark_partition_id).count

回答by Raphael Roth

You can get the number of records per partition like this :

您可以获得每个分区的记录数,如下所示:

df
  .rdd
  .mapPartitionsWithIndex{case (i,rows) => Iterator((i,rows.size))}
  .toDF("partition_number","number_of_records")
  .show

But this will also launch a Spark Job by itself (because the file must be read by spark to get the number of records).

但这也会自己启动一个 Spark 作业(因为必须由 spark 读取文件才能获取记录数)。

Spark could may also read hive table statistics, but I don't know how to display those metadata..

Spark 也可以读取配置单元表统计信息,但我不知道如何显示这些元数据..

回答by Ram Ghadiyaram

Spark 1.5 solution :

Spark 1.5 解决方案:

(sparkPartitionId()exists in org.apache.spark.sql.functions)

(sparkPartitionId()存在于org.apache.spark.sql.functions)

import org.apache.spark.sql.functions._ 

df.withColumn("partitionId", sparkPartitionId()).groupBy("partitionId").count.show

as mentioned by @Raphael Roth

正如@Raphael Roth 所提到的

mapPartitionsWithIndexis best approach, will work with all version of spark since its RDD based approach

mapPartitionsWithIndex是最好的方法,自其基于 RDD 的方法以来,将适用于所有版本的 spark

回答by BishoyM

For future PySpark users:

对于未来的 PySpark 用户:

from pyspark.sql.functions  import spark_partition_id
rawDf.withColumn("partitionId", spark_partition_id()).groupBy("partitionId").count().show()

回答by Tagar

Spark/scala:

火花/斯卡拉:

val numPartitions = 20000
val a = sc.parallelize(0 until 1e6.toInt, numPartitions )
val l = a.glom().map(_.length).collect()  # get length of each partition
print(l.min, l.max, l.sum/l.length, l.length)  # check if skewed

PySpark:

PySpark:

num_partitions = 20000
a = sc.parallelize(range(int(1e6)), num_partitions)
l = a.glom().map(len).collect()  # get length of each partition
print(min(l), max(l), sum(l)/len(l), len(l))  # check if skewed

The same is possible for a dataframe, not just for an RDD. Just add DF.rdd.glom... into the code above.

对于 a 也是可能的dataframe,而不仅仅是对于 an RDD。只需将DF.rdd.glom...添加到上面的代码中。

Credits: Mike Dusenberry @ https://issues.apache.org/jira/browse/SPARK-17817

学分:迈克·杜森伯里 @ https://issues.apache.org/jira/browse/SPARK-17817