scala 为什么这个简单的 Spark 程序不使用多核?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/26828987/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 06:41:33  来源:igfitidea点击:

Why is this simple Spark program not utlizing multiple cores?

pythonscalabigdataapache-sparkmulticore

提问by MetallicPriest

So, I'm running this simple program on a 16 core multicore system. I run it by issuing the following.

所以,我在 16 核多核系统上运行这个简单的程序。我通过发出以下命令来运行它。

spark-submit --master local[*] pi.py

And the code of that program is the following.

该程序的代码如下。

#"""pi.py"""
from pyspark import SparkContext
import random

N = 12500000

def sample(p):
    x, y = random.random(), random.random()
    return 1 if x*x + y*y < 1 else 0

sc = SparkContext("local", "Test App")
count = sc.parallelize(xrange(0, N)).map(sample).reduce(lambda a, b: a + b)
print "Pi is roughly %f" % (4.0 * count / NUM_SAMPLES)

When I use top to see CPU consumption, only 1 core is being utilized. Why is it so? Seconldy, spark documentation says that the default parallelism is contained in property spark.default.parallelism. How can I read this property from within my python program?

当我使用 top 查看 CPU 消耗时,只使用了 1 个内核。为什么会这样?其次,spark 文档说默认并行度包含在属性 spark.default.parallelism 中。如何从我的 python 程序中读取这个属性?

回答by Ivaylo Petrov

As none of the above really worked for me (maybe because I didn't really understand them), here is my two cents.

由于以上都没有真正对我有用(可能是因为我并没有真正理解它们),这是我的两分钱。

I was starting my job with spark-submit program.pyand inside the file I had sc = SparkContext("local", "Test"). I tried to verify the number of cores sparkseeswith sc.defaultParallelism. It turned out that it was 1. When I changed the context initialization to sc = SparkContext("local[*]", "Test")it became 16 (the number of cores of my system) and my program was using all the cores.

我在我拥有spark-submit program.py的文件中开始我的工作sc = SparkContext("local", "Test")。我试图验证核心数量火花看到sc.defaultParallelism。结果是1。当我将上下文初始化更改为sc = SparkContext("local[*]", "Test")16(我的系统的核心数)时,我的程序正在使用所有核心。

I am quite new to spark, but my understanding is that localby default indicates the use of one core and as it is set inside the program, it would overwrite the other settings (for sure in my case it overwrites those from configuration files and environment variables).

我对spark陌生,但我的理解是local默认情况下表示使用一个核心,因为它是在程序内部设置的,它会覆盖其他设置(在我的情况下,它肯定会覆盖配置文件和环境中的那些设置)变量)。

回答by Svend

Probably because the call to sc.parallelize puts all the data into one single partition. You can specify the number of partitions as 2nd argument to parallelize:

可能是因为调用 sc.parallelize 会将所有数据放入一个分区。您可以指定分区数作为并行化的第二个参数:

part = 16
count = sc.parallelize(xrange(N), part).map(sample).reduce(lambda a, b: a + b)

Note that this would still generate the 12 millions points with one CPU in the driver and then only spread them out to 16 partitions to perform the reduce step.

请注意,这仍然会使用驱动程序中的一个 CPU 生成 1200 万个点,然后仅将它们分散到 16 个分区以执行缩减步骤。

A better approach would try to do most of the work after the partitioning: for example the following generates only a tiny array on the driver and then lets each remote task generate the actual random numbers and subsequent PI approximation:

更好的方法是尝试在分区后完成大部分工作:例如,以下内容仅在驱动程序上生成一个很小的数组,然后让每个远程任务生成实际的随机数和随后的 PI 近似值:

part = 16
count = ( sc.parallelize([0] * part, part)
           .flatMap(lambda blah: [sample(p) for p in xrange( N/part)])
           .reduce(lambda a, b: a + b)
       )

Finally, (because the more lazy we are the better), spark mllib actually comes already with a random data generation which is nicely parallelized, have a look here: http://spark.apache.org/docs/1.1.0/mllib-statistics.html#random-data-generation. So maybe the following is close to what you try to do (not tested => probably not working, but should hopefully be close)

最后,(因为我们越懒越好),spark mllib 实际上已经附带了一个很好地并行化的随机数据生成,请看这里:http://spark.apache.org/docs/1.1.0/mllib -statistics.html#random-data-generation。所以也许以下内容与您尝试做的很接近(未测试 => 可能不起作用,但希望接近)

count = ( RandomRDDs.uniformRDD(sc, N, part)
        .zip(RandomRDDs.uniformRDD(sc, N, part))
        .filter (lambda (x, y): x*x + y*y < 1)
        .count()
        )

回答by user93

To change the CPU core consumption, set the number of cores to be used by the workers in the spark-env.shfile in spark-installation-directory/confThis is done with the SPARK_EXECUTOR_CORESattribute in spark-env.sh file. The value is set to 1 by default.

要更改 CPU 核心消耗,请在spark-env.sh文件中设置工作人员要使用的核心数。spark-installation-directory/conf这是通过SPARK_EXECUTOR_CORESspark-env.sh 文件中的属性完成的。该值默认设置为 1。

回答by xiaohan2012

I tried the method mentioned by @Svend, but still does not work.

我尝试了@Svend 提到的方法,但仍然无效。

The following works for me:

以下对我有用:

Do NOTuse the localurl, for example:

千万不要使用localURL,例如:

sc = SparkContext("local", "Test App").

sc = SparkContext("local", "Test App").

Use the master URL like this:

像这样使用主 URL:

sc = SparkContext("spark://your_spark_master_url:port", "Test App")

sc = SparkContext("spark://your_spark_master_url:port", "Test App")