Scala 与 Python 的 Spark 性能
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/32464122/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Spark performance for Scala vs Python
提问by Mrityunjay
I prefer Python over Scala. But, as Spark is natively written in Scala, I was expecting my code to run faster in the Scala than the Python version for obvious reasons.
相比 Scala,我更喜欢 Python。但是,由于 Spark 本身是用 Scala 编写的,我希望我的代码在 Scala 中运行得比 Python 版本更快,原因很明显。
With that assumption, I thought to learn & write the Scala version of some very common preprocessing code for some 1 GB of data. Data is picked from the SpringLeaf competition on Kaggle. Just to give an overview of the data (it contains 1936 dimensions and 145232 rows). Data is composed of various types e.g. int, float, string, boolean. I am using 6 cores out of 8 for Spark processing; that's why I used minPartitions=6so that every core has something to process.
有了这个假设,我想为大约 1 GB 的数据学习和编写一些非常常见的预处理代码的 Scala 版本。数据来自Kaggle上的 SpringLeaf 竞赛。只是为了概述数据(它包含 1936 个维度和 145232 行)。数据由各种类型组成,例如 int、float、string、boolean。我使用 8 个内核中的 6 个进行 Spark 处理;这就是我使用的原因,minPartitions=6以便每个内核都有要处理的内容。
Scala Code
Scala 代码
val input = sc.textFile("train.csv", minPartitions=6)
val input2 = input.mapPartitionsWithIndex { (idx, iter) =>
if (idx == 0) iter.drop(1) else iter }
val delim1 = "input = sc.textFile('train.csv', minPartitions=6)
DELIM_1 = 'from pyspark.sql.functions import col
col("foo")
1'
def drop_first_line(index, itr):
if index == 0:
return iter(list(itr)[1:])
else:
return itr
input2 = input.mapPartitionsWithIndex(drop_first_line)
def separate_cols(line):
line = line.replace('true', '1').replace('false', '0')
vals = line.split(',')
vals2 = ['VAR_%04d%s%s' %(e, DELIM_1, val.strip('\"'))
for e, val in enumerate(vals)]
return vals2
input3 = input2.flatMap(separate_cols)
def to_key_val(kv):
key, val = kv.split(DELIM_1)
return (key, val)
input4 = input3.map(to_key_val)
def vals_concat(v1, v2):
return v1 + ',' + v2
input5 = input4.reduceByKey(vals_concat)
input5.saveAsTextFile('output')
1"
def separateCols(line: String): Array[String] = {
val line2 = line.replaceAll("true", "1")
val line3 = line2.replaceAll("false", "0")
val vals: Array[String] = line3.split(",")
for((x,i) <- vals.view.zipWithIndex) {
vals(i) = "VAR_%04d".format(i) + delim1 + x
}
vals
}
val input3 = input2.flatMap(separateCols)
def toKeyVal(line: String): (String, String) = {
val vals = line.split(delim1)
(vals(0), vals(1))
}
val input4 = input3.map(toKeyVal)
def valsConcat(val1: String, val2: String): String = {
val1 + "," + val2
}
val input5 = input4.reduceByKey(valsConcat)
input5.saveAsTextFile("output")
Python Code
Python代码
rdd.aggregateByKey(new StringBuilder)(
(acc, e) => {
if(!acc.isEmpty) acc.append(",").append(e)
else acc.append(e)
},
(acc1, acc2) => {
if(acc1.isEmpty | acc2.isEmpty) acc1.addString(acc2)
else acc1.append(",").addString(acc2)
}
)
Scala PerformanceStage 0 (38 mins), Stage 1 (18 sec)

Scala 表演第 0 阶段(38 分钟),第 1 阶段(18 秒)

Python PerformanceStage 0 (11 mins), Stage 1 (7 sec)

Python 性能第 0 阶段(11 分钟)、第 1 阶段(7 秒)

Both produces different DAG visualization graphs (due to which both pictures show different stage 0 functions for Scala (map) and Python (reduceByKey))
两者都产生不同的 DAG 可视化图(由于这两张图显示了 Scala ( map) 和 Python ( reduceByKey) 的不同阶段 0 函数)
But, essentially both code tries to transform data into (dimension_id, string of list of values) RDD and save to disk. The output will be used to compute various statistics for each dimension.
但是,基本上这两个代码都试图将数据转换为(dimension_id,值列表字符串)RDD 并保存到磁盘。输出将用于计算每个维度的各种统计数据。
Performance wise, Scala code for this real data like this seems to run 4 times slowerthan the Python version. Good news for me is that it gave me good motivation to stay with Python. Bad news is I didn't quite understand why?
在性能方面,像这样的真实数据的 Scala 代码似乎比 Python 版本慢 4 倍。对我来说好消息是它给了我继续使用 Python 的良好动力。坏消息是我不太明白为什么?
回答by zero323
The original answer discussing the code can be found below.
可以在下面找到讨论代码的原始答案。
First of all, you have to distinguish between different types of API, each with its own performance considerations.
首先,您必须区分不同类型的 API,每种类型都有自己的性能考虑。
RDD API
RDD API
(pure Python structures with JVM based orchestration)
(具有基于 JVM 的编排的纯 Python 结构)
This is the component which will be most affected by the performance of the Python code and the details of PySpark implementation. While Python performance is rather unlikely to be a problem, there at least few factors you have to consider:
这是受 Python 代码性能和 PySpark 实现细节影响最大的组件。虽然 Python 性能不太可能成为问题,但您至少需要考虑几个因素:
- Overhead of JVM communication. Practically all data that comes to and from Python executor has to be passed through a socket and a JVM worker. While this is a relatively efficient local communication it is still not free.
Process-based executors (Python) versus thread based (single JVM multiple threads) executors (Scala). Each Python executor runs in its own process. As a side effect, it provides stronger isolation than its JVM counterpart and some control over executor lifecycle but potentially significantly higher memory usage:
- interpreter memory footprint
- footprint of the loaded libraries
- less efficient broadcasting (each process requires its own copy of a broadcast)
Performance of Python code itself. Generally speaking Scala is faster than Python but it will vary on task to task. Moreover you have multiple options including JITs like Numba, C extensions (Cython) or specialized libraries like Theano. Finally,
if you don't use ML / MLlib (or simply NumPy stack), consider using PyPyas an alternative interpreter. See SPARK-3094.- PySpark configuration provides the
spark.python.worker.reuseoption which can be used to choose between forking Python process for each task and reusing existing process. The latter option seems to be useful to avoid expensive garbage collection (it is more an impression than a result of systematic tests), while the former one (default) is optimal for in case of expensive broadcasts and imports. - Reference counting, used as the first line garbage collection method in CPython, works pretty well with typical Spark workloads (stream-like processing, no reference cycles) and reduces the risk of long GC pauses.
- JVM 通信的开销。实际上,所有进出 Python 执行器的数据都必须通过套接字和 JVM 工作器传递。虽然这是一种相对有效的本地通信,但它仍然不是免费的。
基于进程的执行程序 (Python) 与基于线程的(单 JVM 多线程)执行程序 (Scala)。每个 Python 执行器都在自己的进程中运行。作为副作用,它提供了比 JVM 更强的隔离和对执行程序生命周期的一些控制,但可能会显着提高内存使用量:
- 解释器内存占用
- 加载的库的足迹
- 广播效率较低(每个进程都需要自己的广播副本)
Python 代码本身的性能。一般来说,Scala 比 Python 快,但它会因任务而异。此外,您有多种选择,包括 JIT 之类的Numba、C 扩展(Cython)或Theano 之类的专用库。最后,
如果您不使用 ML / MLlib(或仅使用 NumPy 堆栈),请考虑使用PyPy作为替代解释器。见SPARK-3094。- PySpark 配置提供了一个
spark.python.worker.reuse选项,可用于在为每个任务分叉 Python 进程和重用现有进程之间进行选择。后一个选项似乎有助于避免昂贵的垃圾收集(它更像是一种印象而不是系统测试的结果),而前一个(默认)对于昂贵的广播和导入是最佳的。 - 引用计数作为 CPython 中的第一行垃圾收集方法,在典型的 Spark 工作负载(类流处理,无引用周期)中工作得非常好,并降低了长时间 GC 暂停的风险。
MLlib
MLlib
(mixed Python and JVM execution)
(混合 Python 和 JVM 执行)
Basic considerations are pretty much the same as before with a few additional issues. While basic structures used with MLlib are plain Python RDD objects, all algorithms are executed directly using Scala.
基本注意事项与以前几乎相同,但有一些其他问题。虽然与 MLlib 一起使用的基本结构是普通的 Python RDD 对象,但所有算法都是直接使用 Scala 执行的。
It means an additional cost of converting Python objects to Scala objects and the other way around, increased memory usage and some additional limitations we'll cover later.
这意味着将 Python 对象转换为 Scala 对象会产生额外成本,反之亦然,增加内存使用量以及我们稍后将介绍的一些额外限制。
As of now (Spark 2.x), the RDD-based API is in a maintenance mode and is scheduled to be removed in Spark 3.0.
截至目前(Spark 2.x),基于 RDD 的 API 处于维护模式,计划在 Spark 3.0 中删除。
DataFrame API and Spark ML
DataFrame API 和 Spark ML
(JVM execution with Python code limited to the driver)
(JVM 执行的 Python 代码仅限于驱动程序)
These are probably the best choice for standard data processing tasks. Since Python code is mostly limited to high-level logical operations on the driver, there should be no performance difference between Python and Scala.
这些可能是标准数据处理任务的最佳选择。由于 Python 代码主要限于驱动程序上的高级逻辑操作,因此 Python 和 Scala 之间应该没有性能差异。
A single exception is usage of row-wise Python UDFs which are significantly less efficient than their Scala equivalents. While there is some chance for improvements (there has been substantial development in Spark 2.0.0), the biggest limitation is full roundtrip between internal representation (JVM) and Python interpreter. If possible, you should favor a composition of built-in expressions (example. Python UDF behavior has been improved in Spark 2.0.0, but it is still suboptimal compared to native execution.
一个例外是使用行式 Python UDF,其效率明显低于 Scala 等效项。虽然有一些改进的机会(Spark 2.0.0 有了实质性的发展),但最大的限制是内部表示 (JVM) 和 Python 解释器之间的完整往返。如果可能,您应该支持内置表达式的组合(例如。Python UDF 行为已在 Spark 2.0.0 中得到改进,但与本机执行相比,它仍然不是最佳的。
This may improved in the futurehas improved significantly with introduction of the vectorized UDFs (SPARK-21190 and further extensions), which uses Arrow Streaming for efficient data exchange with zero-copy deserialization. For most applications their secondary overheads can be just ignored.
随着向量化 UDF(SPARK-21190 和进一步扩展)的引入,这可能会在未来得到显着改善,它使用 Arrow Streaming 进行零拷贝反序列化的高效数据交换。对于大多数应用程序,它们的次要开销可以忽略不计。
Also be sure to avoid unnecessary passing data between DataFramesand RDDs. This requires expensive serialization and deserialization, not to mention data transfer to and from Python interpreter.
还要确保避免在DataFrames和之间传递不必要的数据RDDs。这需要昂贵的序列化和反序列化,更不用说与 Python 解释器之间的数据传输了。
It is worth noting that Py4J calls have pretty high latency. This includes simple calls like:
值得注意的是,Py4J 调用具有相当高的延迟。这包括简单的调用,例如:
val input = sc.textFile("train.csv", 6).mapPartitionsWithIndex{
(idx, iter) => if (idx == 0) iter.drop(1) else iter
}
val pairs = input.flatMap(line => line.split(",").zipWithIndex.map{
case ("true", i) => (i, "1")
case ("false", i) => (i, "0")
case p => p.swap
})
val result = pairs.groupByKey.map{
case (k, vals) => {
val valsString = vals.mkString(",")
s"$k,$valsString"
}
}
result.saveAsTextFile("scalaout")
Usually, it shouldn't matter (overhead is constant and doesn't depend on the amount of data) but in the case of soft real-time applications, you may consider caching/reusing Java wrappers.
通常,这无关紧要(开销是恒定的并且不取决于数据量),但是在软实时应用程序的情况下,您可以考虑缓存/重用 Java 包装器。
GraphX and Spark DataSets
GraphX 和 Spark 数据集
As for now (Spark 1.62.1) neither one provides PySpark API so you can say that PySpark is infinitely worse than Scala.
至于现在(Spark 1.62.1)都没有提供 PySpark API,所以你可以说 PySpark 比 Scala 差很多。
In practice, GraphX development stopped almost completely and the project is currently in the maintenance mode with related JIRA tickets closed as won't fix. GraphFrameslibrary provides an alternative graph processing library with Python bindings.
在实践中,GraphX 开发几乎完全停止,该项目目前处于维护模式,相关 JIRA 票证已关闭,无法修复。GraphFrames库提供了一个带有 Python 绑定的替代图形处理库。
数据集Subjectively speaking there is not much place for statically typed Datasetsin Python and even if there was the current Scala implementation is too simplistic and doesn't provide the same performance benefits as DataFrame.
主观上讲Datasets,Python 中的静态类型并没有太多的地方,即使有当前的 Scala 实现也太简单了,不能提供与DataFrame.
Streaming
流媒体
From what I've seen so far, I would strongly recommend using Scala over Python. It may change in the future if PySpark gets support for structured streams but right now Scala API seems to be much more robust, comprehensive and efficient. My experience is quite limited.
从我目前所见,我强烈建议使用 Scala 而不是 Python。如果 PySpark 获得对结构化流的支持,未来可能会发生变化,但现在 Scala API 似乎更加健壮、全面和高效。我的经验非常有限。
Structured streaming in Spark 2.x seem to reduce the gap between languages but for now it is still in its early days. Nevertheless, RDD based API is already referenced as "legacy streaming" in the Databricks Documentation(date of access 2017-03-03)) so it reasonable to expect further unification efforts.
Spark 2.x 中的结构化流似乎缩小了语言之间的差距,但目前它仍处于早期阶段。尽管如此,基于 RDD 的 API 已经在Databricks 文档(访问日期2017 年 3月 3 日)中被称为“传统流”,因此有理由期待进一步的统一努力。
Non-performance considerations
非绩效考虑
功能奇偶校验Not all Spark features are exposed through PySpark API. Be sure to check if the parts you need are already implemented and try to understand possible limitations.
并非所有 Spark 功能都通过 PySpark API 公开。请务必检查您需要的部分是否已经实现,并尝试了解可能的限制。
It is particularly important when you use MLlib and similar mixed contexts (see Calling Java/Scala function from a task). To be fair some parts of the PySpark API, like mllib.linalg, provides a more comprehensive set of methods than Scala.
当您使用 MLlib 和类似的混合上下文时,这一点尤为重要(请参阅从任务中调用 Java/Scala 函数)。公平地说,PySpark API 的某些部分,例如mllib.linalg,提供了比 Scala 更全面的方法集。
The PySpark API closely reflects its Scala counterpart and as such is not exactly Pythonic. It means that it is pretty easy to map between languages but at the same time, Python code can be significantly harder to understand.
PySpark API 密切反映了它的 Scala 对应物,因此不完全是 Pythonic。这意味着在语言之间进行映射非常容易,但与此同时,Python 代码可能更难理解。
复杂的架构PySpark data flow is relatively complex compared to pure JVM execution. It is much harder to reason about PySpark programs or debug. Moreover at least basic understanding of Scala and JVM in general is pretty much a must have.
与纯 JVM 执行相比,PySpark 数据流相对复杂。推理 PySpark 程序或调试要困难得多。此外,至少对 Scala 和 JVM 的基本了解几乎是必须的。
Spark 2.x 及更高版本Ongoing shift towards DatasetAPI, with frozen RDD API brings both opportunities and challenges for Python users. While high level parts of the API are much easier to expose in Python, the more advanced features are pretty much impossible to be used directly.
持续向DatasetAPI转变,冻结 RDD API 为 Python 用户带来了机遇和挑战。虽然 API 的高级部分在 Python 中更容易公开,但更高级的功能几乎不可能直接使用。
Moreover native Python functions continue to be second class citizen in the SQL world. Hopefully this will improve in the future with Apache Arrow serialization (current efforts target data collectionbut UDF serde is a long term goal).
此外,本机 Python 函数仍然是 SQL 世界中的二等公民。希望这将在未来通过 Apache Arrow 序列化得到改善(目前的工作目标是数据,collection但 UDF serde 是一个长期目标)。
For projects strongly depending on the Python codebase, pure Python alternatives (like Daskor Ray) could be an interesting alternative.
对于强烈依赖 Python 代码库的项目,纯 Python 替代方案(如Dask或Ray)可能是一个有趣的替代方案。
It doesn't have to be one vs. the other
不一定非要一对一
The Spark DataFrame (SQL, Dataset) API provides an elegant way to integrate Scala/Java code in PySpark application. You can use DataFramesto expose data to a native JVM code and read back the results. I've explained some options somewhere elseand you can find a working example of Python-Scala roundtrip in How to use a Scala class inside Pyspark.
Spark DataFrame (SQL, Dataset) API 提供了一种在 PySpark 应用程序中集成 Scala/Java 代码的优雅方式。您可以使用DataFrames将数据公开给本机 JVM 代码并读回结果。我已经在其他地方解释了一些选项,您可以在How to use a Scala class inside Pyspark 中找到 Python-Scala 往返的工作示例。
It can be further augmented by introducing User Defined Types (see How to define schema for custom type in Spark SQL?).
可以通过引入用户定义类型进一步增强它(请参阅如何在 Spark SQL 中为自定义类型定义模式?)。
What is wrong with code provided in the question
问题中提供的代码有什么问题
(Disclaimer: Pythonista point of view. Most likely I've missed some Scala tricks)
(免责声明:Pythonista 的观点。很可能我错过了一些 Scala 技巧)
First of all, there is one part in your code which doesn't make sense at all. If you already have (key, value)pairs created using zipWithIndexor enumeratewhat is the point in creating string just to split it right afterwards? flatMapdoesn't work recursively so you can simply yield tuples and skip following mapwhatsoever.
首先,您的代码中有一个部分根本没有意义。如果您已经(key, value)使用创建了对,zipWithIndex或者enumerate创建字符串只是为了在之后立即拆分它有什么意义?flatMap不能递归地工作,因此您可以简单地生成元组并跳过map任何内容。
Another part I find problematic is reduceByKey. Generally speaking, reduceByKeyis useful if applying aggregate function can reduce the amount of data that has to be shuffled. Since you simply concatenate strings there is nothing to gain here. Ignoring low-level stuff, like the number of references, the amount of data you have to transfer is exactly the same as for groupByKey.
我发现有问题的另一部分是reduceByKey. 一般来说,reduceByKey如果应用聚合函数可以减少必须混洗的数据量,则很有用。由于您只是连接字符串,因此这里没有任何好处。忽略低级的东西,比如引用的数量,你必须传输的数据量与groupByKey.
Normally I wouldn't dwell on that, but as far as I can tell it is a bottleneck in your Scala code. Joining strings on JVM is a rather expensive operation (see for example: Is string concatenation in scala as costly as it is in Java?). It means that something like this _.reduceByKey((v1: String, v2: String) => v1 + ',' + v2)which is equivalent to input4.reduceByKey(valsConcat)in your code is not a good idea.
通常我不会详细说明,但据我所知,这是 Scala 代码中的瓶颈。在 JVM 上连接字符串是一项相当昂贵的操作(参见例如:Scala 中的字符串连接是否与 Java 中的一样昂贵?)。这意味着在您的代码中_.reduceByKey((v1: String, v2: String) => v1 + ',' + v2)等同于这样的东西input4.reduceByKey(valsConcat)不是一个好主意。
If you want to avoid groupByKeyyou can try to use aggregateByKeywith StringBuilder. Something similar to this should do the trick:
如果你想避免groupByKey你可以尝试使用aggregateByKeywith StringBuilder。与此类似的事情应该可以解决问题:
def drop_first_line(index, itr):
if index == 0:
return iter(list(itr)[1:])
else:
return itr
def separate_cols(line):
line = line.replace('true', '1').replace('false', '0')
vals = line.split(',')
for (i, x) in enumerate(vals):
yield (i, x)
input = (sc
.textFile('train.csv', minPartitions=6)
.mapPartitionsWithIndex(drop_first_line))
pairs = input.flatMap(separate_cols)
result = (pairs
.groupByKey()
.map(lambda kv: "{0},{1}".format(kv[0], ",".join(kv[1]))))
result.saveAsTextFile("pythonout")
but I doubt it is worth all the fuss.
但我怀疑这是否值得大惊小怪。
Keeping the above in mind, I've rewritten your code as follows:
牢记上述内容,我已将您的代码重写如下:
Scala:
斯卡拉:
def go():
with open("train.csv") as fr:
lines = [
line.replace('true', '1').replace('false', '0').split(",")
for line in fr]
return zip(*lines[1:])
Python:
蟒蛇:
##代码##Results
结果
In local[6]mode (Intel(R) Xeon(R) CPU E3-1245 V2 @ 3.40GHz) with 4GB memory per executor it takes (n = 3):
在local[6]模式(Intel(R) Xeon(R) CPU E3-1245 V2 @ 3.40GHz)下,每个执行器需要 4GB 内存(n = 3):
- Scala - mean: 250.00s, stdev: 12.49
- Python - mean: 246.66s, stdev: 1.15
- Scala - 平均值:250.00s,标准差:12.49
- Python - 平均值:246.66s,标准差:1.15
I am pretty sure that most of that time is spent on shuffling, serializing, deserializing and other secondary tasks. Just for fun, here's naive single-threaded code in Python that performs the same task on this machine in less than a minute:
我很确定大部分时间都花在了改组、序列化、反序列化和其他次要任务上。只是为了好玩,这里是 Python 中的简单单线程代码,可以在不到一分钟的时间内在这台机器上执行相同的任务:
##代码##回答by Mrityunjay
Extension to above answers -
对上述答案的扩展 -
Scala proves faster in many ways compare to python but there are some valid reasons why python is becoming more popular that scala, let see few of them —
与 Python 相比,Scala 在许多方面都被证明更快,但 Python 比 Scala 更受欢迎的原因有很多,让我们看看其中的几个——
Python for Apache Spark is pretty easy to learn and use. However, this not the only reason why Pyspark is a better choice than Scala. There's more.
Python for Apache Spark 非常容易学习和使用。然而,这并不是 Pyspark 比 Scala 更好的选择的唯一原因。还有更多。
Python API for Spark may be slower on the cluster, but at the end, data scientists can do a lot more with it as compared to Scala. The complexity of Scala is absent. The interface is simple and comprehensive.
用于 Spark 的 Python API 在集群上可能更慢,但最终,与 Scala 相比,数据科学家可以用它做更多的事情。Scala 的复杂性不存在。界面简单而全面。
Talking about the readability of code, maintenance and familiarity with Python API for Apache Spark is far better than Scala.
说起代码的可读性、维护性以及对Python API for Apache Spark 的熟悉程度,都远胜于Scala。
Python comes with several libraries related to machine learning and natural language processing. This aids in data analysis and also has statistics that are much mature and time-tested. For instance, numpy, pandas, scikit-learn, seaborn and matplotlib.
Python 附带了几个与机器学习和自然语言处理相关的库。这有助于数据分析,并且还具有非常成熟且经过时间考验的统计数据。例如,numpy、pandas、scikit-learn、seaborn 和 matplotlib。
Note: Most data scientists use a hybrid approach where they use the best of both the APIs.
注意:大多数数据科学家使用混合方法,他们使用两种 API 中的优点。
Lastly, Scala community often turns out to be lot less helpful to programmers. This makes Python a much valuable learning. If you have enough experience with any statically typed programming language like Java, you can stop worrying about not using Scala altogether.
最后,Scala 社区对程序员的帮助通常要小得多。这使得 Python 成为非常有价值的学习。如果您对任何静态类型编程语言(如 Java)有足够的经验,您就可以不必担心完全不使用 Scala。

