scala 如何使用spark生成大量随机整数?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/29069080/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 06:58:33  来源:igfitidea点击:

How to use spark to generate huge amount of random integers?

scalaapache-spark

提问by Haoliang

I need lots of random numbers, one per line. The result should be something like this:

我需要很多随机数,每行一个。结果应该是这样的:

24324 24324
4234234 4234234
1310313 1310313
...

So I wrote this spark code (Sorry I'm new to Spark and scala):

所以我写了这个火花代码(对不起,我是 Spark 和 Scala 的新手):

import util.Random

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._

object RandomIntegerWriter {
  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println("Usage: RandomIntegerWriter <num Integers> <outDir>")
      System.exit(1)
    }
    val conf = new SparkConf().setAppName("Spark RandomIntegerWriter")
    val spark = new SparkContext(conf)
    val distData = spark.parallelize(Seq.fill(args(0).toInt)(Random.nextInt))
    distData.saveAsTextFile(args(1))
    spark.stop()
  }
}

Notes: Now I just want to generate one number per line.

注意:现在我只想每行生成一个数字。

But it seems that when number of numbers gets larger, the program will report an error. Any idea with this piece of code?

但似乎当数字变大时,程序会报错。对这段代码有什么想法吗?

Thank you.

谢谢你。

回答by vmhacker

In Spark 1.4 you can use the DataFrame APIto do this:

在 Spark 1.4 中,您可以使用DataFrame API来执行此操作:

In [1]: from pyspark.sql.functions import rand, randn
In [2]: # Create a DataFrame with one int column and 10 rows.
In [3]: df = sqlContext.range(0, 10)
In [4]: df.show()
+--+
|id|
+--+
| 0|
| 1|
| 2|
| 3|
| 4|
| 5|
| 6|
| 7|
| 8|
| 9|
+--+

In [4]: # Generate two other columns using uniform distribution and normal distribution.
In [5]: df.select("id", rand(seed=10).alias("uniform"), randn(seed=27).alias("normal")).show()
+--+-------------------+--------------------+
|id|            uniform|              normal|
+--+-------------------+--------------------+
| 0| 0.7224977951905031| -0.1875348803463305|
| 1| 0.2953174992603351|-0.26525647952450265|
| 2| 0.4536856090041318| -0.7195024130068081|
| 3| 0.9970412477032209|  0.5181478766595276|
| 4|0.19657711634539565|  0.7316273979766378|
| 5|0.48533720635534006| 0.07724879367590629|
| 6| 0.7369825278894753| -0.5462256961278941|
| 7| 0.5241113627472694| -0.2542275002421211|
| 8| 0.2977697066654349| -0.5752237580095868|
| 9| 0.5060159582230856|  1.0900096472044518|
+--+-------------------+--------------------+

回答by cloud

try

尝试

val distData = spark.parallelize(Seq[Int](), numPartitions)
  .mapPartitions { _ => {
    (1 to recordsPerPartition).map{_ => Random.nextInt}.iterator
  }}

It will create an empty collection in driver side, but generate many random integers in worker side. Total number of records is: numPartitions * recordsPerPartition

它将在驱动程序端创建一个空集合,但在工作程序端生成许多随机整数。记录总数为:numPartitions * recordsPerPartition

回答by maasg

Running on a Spark Cluster

在 Spark 集群上运行

The current version is materializing the collection of random numbers in the memory of the driver. If that collection is very large, the driver will run out of memory. Note that that version does not make use of Spark's processing capabilities as it's only using it to save the data after it's created.

当前版本正在将驱动程序内存中的随机数集合具体化。如果该集合非常大,驱动程序将耗尽内存。请注意,该版本没有使用 Spark 的处理能力,因为它只是在创建数据后使用它来保存数据。

Assuming we are working on a cluster, what we need to do is to distribute the work required to generate the data among the executors. One way of doing that would be transforming the original algorithm in a version that can work across the cluster by dividing the work among executors:

假设我们在一个集群上工作,我们需要做的是在执行器之间分配生成数据所需的工作。一种方法是将原始算法转换为一个可以跨集群工作的版本,通过在执行器之间划分工作:

val numRecords:Int = ???
val partitions:Int = ???
val recordsPerPartition = numRecords / partitions // we are assuming here that numRecords is divisible by partitions, otherwise we need to compensate for the residual 

val seedRdd = sparkContext.parallelize(Seq.fill(partitions)(recordsPerPartition),partitions)
val randomNrs = seedRdd.flatMap(records => Seq.fill(records)(Random.nextInt))
randomNrs.saveAsTextFile(...)

Running on a single machine

在单机上运行

If we don't have a cluster, and this is meant to run on a single machine, the question would be "why use Spark?". This random generator process is basically I/O bound and could be done in O(1) of memory by sequentially writing random numbers to a file.

如果我们没有集群,而这意味着要在单台机器上运行,那么问题将是“为什么要使用 Spark?”。这个随机生成器过程基本上是 I/O 绑定的,可以通过将随机数顺序写入文件在 O(1) 内存中完成。

import java.io._
def randomFileWriter(file:String, records:Long):Unit = {
    val pw = new PrintWriter(new BufferedWriter(new FileWriter(file)))
    def loop(count:Int):Unit = {
        if (count <= 0) () else {    
          pw.println(Random.nextInt)
          writeRandom(writer, count-1)
        }
    }
    loop(records)
    pw.close
}

回答by Antonio Cachuan

Working in Spark 2.3.0

在 Spark 2.3.0 中工作

Python  
df = spark.range(0, 10)

Scala
val df = spark.range(0, 10)