Python 如何对 PySpark 程序进行单元测试?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/33811882/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-19 14:01:33  来源:igfitidea点击:

How do I unit test PySpark programs?

pythonunit-testingapache-sparkpyspark

提问by 0111001101110000

My current Java/Spark Unit Test approach works (detailed here) by instantiating a SparkContext using "local" and running unit tests using JUnit.

我当前的 Java/Spark 单元测试方法通过使用“本地”实例化 SparkContext 并使用 JUnit 运行单元测试来工作(在此处详细说明)。

The code has to be organized to do I/O in one function and then call another with multiple RDDs.

必须组织代码以在一个函数中执行 I/O,然后使用多个 RDD 调用另一个函数。

This works great. I have a highly tested data transformation written in Java + Spark.

这很好用。我有一个用 Java + Spark 编写的经过高度测试的数据转换。

Can I do the same with Python?

我可以用 Python 做同样的事情吗?

How would I run Spark unit tests with Python?

我将如何使用 Python 运行 Spark 单元测试?

回答by santon

I use pytest, which allows test fixtures so you can instantiate a pyspark context and inject it into all of your tests that require it. Something along the lines of

我使用pytest,它允许测试装置,因此您可以实例化 pyspark 上下文并将其注入所有需要它的测试中。类似的东西

@pytest.fixture(scope="session",
                params=[pytest.mark.spark_local('local'),
                        pytest.mark.spark_yarn('yarn')])
def spark_context(request):
    if request.param == 'local':
        conf = (SparkConf()
                .setMaster("local[2]")
                .setAppName("pytest-pyspark-local-testing")
                )
    elif request.param == 'yarn':
        conf = (SparkConf()
                .setMaster("yarn-client")
                .setAppName("pytest-pyspark-yarn-testing")
                .set("spark.executor.memory", "1g")
                .set("spark.executor.instances", 2)
                )
    request.addfinalizer(lambda: sc.stop())

    sc = SparkContext(conf=conf)
    return sc

def my_test_that_requires_sc(spark_context):
    assert spark_context.textFile('/path/to/a/file').count() == 10

Then you can run the tests in local mode by calling py.test -m spark_localor in YARN with py.test -m spark_yarn. This has worked pretty well for me.

然后,您可以通过调用py.test -m spark_local或在 YARN 中使用py.test -m spark_yarn. 这对我来说效果很好。

回答by Vikas Kawadia

I'd recommend using py.test as well. py.test makes it easy to create re-usable SparkContext test fixtures and use it to write concise test functions. You can also specialize fixtures (to create a StreamingContext for example) and use one or more of them in your tests.

我也建议使用 py.test 。py.test 可以轻松创建可重用的 SparkContext 测试装置并使用它来编写简洁的测试函数。您还可以专门化设备(例如创建一个 StreamingContext)并在您的测试中使用它们中的一个或多个。

I wrote a blog post on Medium on this topic:

我在 Medium 上写了一篇关于这个主题的博文:

https://engblog.nextdoor.com/unit-testing-apache-spark-with-py-test-3b8970dc013b

https://engblog.nextdoor.com/unit-testing-apache-spark-with-py-test-3b8970dc013b

Here is a snippet from the post:

这是帖子中的一个片段:

pytestmark = pytest.mark.usefixtures("spark_context")
def test_do_word_counts(spark_context):
    """ test word couting
    Args:
       spark_context: test fixture SparkContext
    """
    test_input = [
        ' hello spark ',
        ' hello again spark spark'
    ]

    input_rdd = spark_context.parallelize(test_input, 1)
    results = wordcount.do_word_counts(input_rdd)

    expected_results = {'hello':2, 'spark':3, 'again':1}  
    assert results == expected_results

回答by Kamil Sindi

Here's a solution with pytest if you're using Spark 2.x and SparkSession. I'm also importing a third party package.

如果您使用的是 Spark 2.x 和SparkSession. 我也在导入第三方包。

import logging

import pytest
from pyspark.sql import SparkSession

def quiet_py4j():
    """Suppress spark logging for the test context."""
    logger = logging.getLogger('py4j')
    logger.setLevel(logging.WARN)


@pytest.fixture(scope="session")
def spark_session(request):
    """Fixture for creating a spark context."""

    spark = (SparkSession
             .builder
             .master('local[2]')
             .config('spark.jars.packages', 'com.databricks:spark-avro_2.11:3.0.1')
             .appName('pytest-pyspark-local-testing')
             .enableHiveSupport()
             .getOrCreate())
    request.addfinalizer(lambda: spark.stop())

    quiet_py4j()
    return spark


def test_my_app(spark_session):
   ...

Note if using Python 3, I had to specify that as a PYSPARK_PYTHON environment variable:

请注意,如果使用 Python 3,我必须将其指定为 PYSPARK_PYTHON 环境变量:

import os
import sys

IS_PY2 = sys.version_info < (3,)

if not IS_PY2:
    os.environ['PYSPARK_PYTHON'] = 'python3'

Otherwise you get the error:

否则你会得到错误:

Exception: Python in worker has different version 2.7 than that in driver 3.5, PySpark cannot run with different minor versions.Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

例外:worker 中的 Python 版本 2.7 与驱动程序 3.5 中的版本不同,PySpark 无法在不同的次要版本下运行。请检查环境变量 PYSPARK_PYTHON 和 PYSPARK_DRIVER_PYTHON 是否设置正确。

回答by Alex Markov

Sometime ago I've also faced the same issue and after reading through several articles, forums and some StackOverflow answers I've ended with writing a small plugin for pytest: pytest-spark

前段时间我也遇到了同样的问题,在阅读了几篇文章、论坛和一些 StackOverflow 的答案后,我最终为 pytest 编写了一个小插件:pytest-spark

I am already using it for few months and the general workflow looks good on Linux:

我已经使用它几个月了,一般工作流程在 Linux 上看起来不错:

  1. Install Apache Spark (setup JVM + unpack Spark's distribution to some directory)
  2. Install "pytest" + plugin "pytest-spark"
  3. Create "pytest.ini" in your project directory and specify Spark location there.
  4. Run your tests by pytest as usual.
  5. Optionally you can use fixture "spark_context" in your tests which is provided by plugin - it tries to minimize Spark's logs in the output.
  1. 安装 Apache Spark(设置 JVM + 将 Spark 的发行版解压到某个目录)
  2. 安装“pytest”+插件“pytest-spark”
  3. 在您的项目目录中创建“pytest.ini”并在那里指定 Spark 位置。
  4. 像往常一样通过 pytest 运行您的测试。
  5. 您可以选择在测试中使用由插件提供的夹具“spark_context” - 它尝试最小化输出中的 Spark 日志。

回答by Jorge Leitao

Assuming you have pysparkinstalled, you can use the class below for unitTest it in unittest:

假设您已经pyspark安装,您可以使用下面的类进行 unitTest 它unittest

import unittest
import pyspark


class PySparkTestCase(unittest.TestCase):

    @classmethod
    def setUpClass(cls):
        conf = pyspark.SparkConf().setMaster("local[2]").setAppName("testing")
        cls.sc = pyspark.SparkContext(conf=conf)
        cls.spark = pyspark.SQLContext(cls.sc)

    @classmethod
    def tearDownClass(cls):
        cls.sc.stop()

Example:

例子:

class SimpleTestCase(PySparkTestCase):

    def test_with_rdd(self):
        test_input = [
            ' hello spark ',
            ' hello again spark spark'
        ]

        input_rdd = self.sc.parallelize(test_input, 1)

        from operator import add

        results = input_rdd.flatMap(lambda x: x.split()).map(lambda x: (x, 1)).reduceByKey(add).collect()
        self.assertEqual(results, [('hello', 2), ('spark', 3), ('again', 1)])

    def test_with_df(self):
        df = self.spark.createDataFrame(data=[[1, 'a'], [2, 'b']], 
                                        schema=['c1', 'c2'])
        self.assertEqual(df.count(), 2)

Note that this creates a context per class. Use setUpinstead of setUpClassto get a context per test. This typically adds a lot of overhead time on the execution of the tests, as creating a new spark context is currently expensive.

请注意,这会为每个类创建一个上下文。使用setUp而不是setUpClass获取每个测试的上下文。这通常会增加执行测试的大量开销时间,因为创建新的 Spark 上下文目前很昂贵。