scala 如何在 Spark 2.0+ 中编写单元测试?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/43729262/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 09:13:05  来源:igfitidea点击:

How to write unit tests in Spark 2.0+?

scalaunit-testingapache-sparkjunitapache-spark-sql

提问by bbarker

I've been trying to find a reasonable way to test SparkSessionwith the JUnit testing framework. While there seem to be good examples for SparkContext, I couldn't figure out how to get a corresponding example working for SparkSession, even though it is used in several places internally in spark-testing-base. I'd be happy to try a solution that doesn't use spark-testing-base as well if it isn't really the right way to go here.

我一直在尝试寻找一种合理的方法来SparkSession使用 JUnit 测试框架进行测试。虽然似乎有很好的例子SparkContext,但我不知道如何让相应的例子为 工作SparkSession,即使它在spark-testing-base内部的几个地方使用。我很乐意尝试一个不使用 spark-testing-base 的解决方案,如果它不是真正正确的方法。

Simple test case (complete MWE projectwith build.sbt):

简单的测试用例(完整MWE项目build.sbt):

import com.holdenkarau.spark.testing.DataFrameSuiteBase
import org.junit.Test
import org.scalatest.FunSuite

import org.apache.spark.sql.SparkSession


class SessionTest extends FunSuite with DataFrameSuiteBase {

  implicit val sparkImpl: SparkSession = spark

  @Test
  def simpleLookupTest {

    val homeDir = System.getProperty("user.home")
    val training = spark.read.format("libsvm")
      .load(s"$homeDir\Documents\GitHub\sample_linear_regression_data.txt")
    println("completed simple lookup test")
  }

}

The result of running this with JUnit is an NPE at the load line:

使用 JUnit 运行它的结果是负载线上的 NPE:

java.lang.NullPointerException
    at SessionTest.simpleLookupTest(SessionTest.scala:16)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.runners.model.FrameworkMethod.runReflectiveCall(FrameworkMethod.java:50)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:290)
    at org.junit.runners.ParentRunner.schedule(ParentRunner.java:71)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
    at org.junit.runners.ParentRunner.access
rdd.map(foo).map(bar)
0(ParentRunner.java:58) at org.junit.runners.ParentRunner.evaluate(ParentRunner.java:268) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51) at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:237) at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)

Note it shouldn't matter that the file being loaded exists or not; in a properly configured SparkSession, a more sensible error will be thrown.

请注意,加载的文件是否存在无关紧要;在正确配置的 SparkSession 中,会抛出更合理的错误

回答by Vidya

Thank you for putting this outstanding question out there. For some reason, when it comes to Spark, everyone gets so caught up in the analytics that they forget about the great software engineering practices that emerged the last 15 years or so. This is why we make it a point to discuss testing and continuous integration (among other things like DevOps) in our course.

感谢您提出这个悬而未决的问题。出于某种原因,当谈到 Spark 时,每个人都沉迷于分析,以至于忘记了过去 15 年左右出现的伟大软件工程实践。这就是为什么我们在课程中重点讨论测试和持续集成(以及 DevOps 等其他内容)的原因。

A Quick Aside on Terminology

术语快速旁白

A trueunit test means you have complete control over every component in the test. There can be no interaction with databases, REST calls, file systems, or even the system clock; everything has to be "doubled" (e.g. mocked, stubbed, etc) as Gerard Mezaros puts it in xUnit Test Patterns. I know this seems like semantics, but it really matters. Failing to understand this is one major reason why you see intermittent test failures in continuous integration.

一个真正的单元测试意味着你有过在测试每个组件的完全控制。不能与数据库、REST 调用、文件系统甚至系统时钟交互;正如 Gerard Mezaros 在xUnit Test Patterns 中所说的那样,一切都必须“加倍”(例如模拟、存根等)。我知道这看起来像语义,但它真的很重要。未能理解这一点是您在持续集成中看到间歇性测试失败的一个主要原因。

We Can Still Unit Test

我们仍然可以进行单元测试

So given this understanding, unit testing an RDDis impossible. However, there is still a place for unit testing when developing analytics.

因此,鉴于这种理解,单元测试RDD是不可能的。但是,在开发分析时仍有进行单元测试的地方。

Consider a simple operation:

考虑一个简单的操作:

class MySpec extends WordSpec with Matchers with SparkContextSetup {
  "My analytics" should {
    "calculate the right thing" in withSparkContext { (sparkContext) =>
      val data = Seq(...)
      val rdd = sparkContext.parallelize(data)
      val total = rdd.map(...).filter(...).map(...).reduce(_ + _)

      total shouldBe 1000
    }
  }
}

trait SparkContextSetup {
  def withSparkContext(testMethod: (SparkContext) => Any) {
    val conf = new SparkConf()
      .setMaster("local")
      .setAppName("Spark test")
    val sparkContext = new SparkContext(conf)
    try {
      testMethod(sparkContext)
    }
    finally sparkContext.stop()
  }
} 

Here fooand barare simple functions. Those can be unit tested in the normal way, and they should be with as many corner cases as you can muster. After all, why do they care where they are getting their inputs from whether it is a test fixture or an RDD?

这里foobar是简单的功能。这些可以以正常方式进行单元测试,并且应该尽可能多地使用极端情况。毕竟,他们为什么要关心他们从哪里获得输入,无论是测试装置还是RDD.

Don't Forget the Spark Shell

不要忘记 Spark Shell

This isn't testing per se, but in these early stages you also should be experimenting in the Spark shell to figure out your transformations and especially the consequences of your approach. For example, you can examine physical and logical query plans, partitioning strategy and preservation, and the state of your data with many different functions like toDebugString, explain, glom, show, printSchema, and so on. I will let you explore those.

本身并不是测试,但在这些早期阶段,您还应该在 Spark shell 中进行试验,以确定您的转换,尤其是您的方法的后果。例如,您可以使用许多不同的函数(如toDebugStringexplainglomshow、等)检查物理和逻辑查询计划、分区策略和保留,以及数据的状态printSchema。我会让你去探索那些。

You can also set your master to local[2]in the Spark shell and in your tests to identify any problems that may only arise once you start to distribute work.

您还可以local[2]在 Spark shell 和测试中将 master 设置为,以识别仅在您开始分发工作后可能出现的任何问题。

Integration Testing with Spark

使用 Spark 进行集成测试

Now for the fun stuff.

现在是有趣的东西。

In order to integration testSpark after you feel confident in the quality of your helper functions and RDD/DataFrametransformation logic, it is critical to do a few things (regardless of build tool and test framework):

为了在您对辅助函数和/转换逻辑的质量充满信心后对 Spark进行集成测试,做一些事情是至关重要的(无论构建工具和测试框架如何):RDDDataFrame

  • Increase JVM memory.
  • Enable forking but disable parallel execution.
  • Use your test framework to accumulate your Spark integration tests into suites, and initialize the SparkContextbefore all tests and stop it after all tests.
  • 增加JVM内存。
  • 启用分叉但禁用并行执行。
  • 使用您的测试框架将您的 Spark 集成测试累积到套件中,并SparkContext在所有测试之前初始化并在所有测试之后停止它。

With ScalaTest, you can mix in BeforeAndAfterAll(which I prefer generally) or BeforeAndAfterEachas @ShankarKtheitroadala does to initialize and tear down Spark artifacts. I know this is a reasonable place to make an exception, but I really don't like those mutable vars you have to use though.

使用 ScalaTest,您可以混入BeforeAndAfterAll(我通常更喜欢)或BeforeAndAfterEach像 @ShankarKtheitroadala 那样初始化和拆除 Spark 工件。我知道这是一个合理的例外地方,但我真的不喜欢那些var你必须使用的可变s。

The Loan Pattern

贷款模式

Another approach is to use the Loan Pattern.

另一种方法是使用贷款模式

For example (using ScalaTest):

例如(使用 ScalaTest):

class MySpec extends WordSpec with Matchers with SharedSparkContext {
      "My analytics" should {
        "calculate the right thing" in { 
          val data = Seq(...)
          val rdd = sc.parallelize(data)
          val total = rdd.map(...).filter(...).map(...).reduce(_ + _)

          total shouldBe 1000
        }
      }
 }

As you can see, the Loan Pattern makes use of higher-order functions to "loan" the SparkContextto the test and then to dispose of it after it's done.

如您所见,贷款模式使用高阶函数SparkContext将测试“贷款”到测试中,然后在测试完成后将其处理掉。

Suffering-Oriented Programming (Thanks, Nathan)

面向苦难的编程(谢谢,Nathan)

It is totally a matter of preference, but I prefer to use the Loan Pattern and wire things up myself as long as I can before bringing in another framework. Aside from just trying to stay lightweight, frameworks sometimes add a lot of "magic" that makes debugging test failures hard to reason about. So I take a Suffering-Oriented Programmingapproach--where I avoid adding a new framework until the pain of not having it is too much to bear. But again, this is up to you.

这完全是一个偏好问题,但我更喜欢使用贷款模式,并在引入另一个框架之前尽可能地自己连接东西。除了试图保持轻量级之外,框架有时还会添加很多“魔法”,使调试测试失败难以推理。所以我采用了一种面向痛苦的编程方法——我避免添加一个新框架,直到无法忍受它的痛苦为止。但同样,这取决于你。

The best choice for that alternate framework is of course spark-testing-baseas @ShankarKtheitroadala mentioned. In that case, the test above would look like this:

该替代框架的最佳选择当然是@ShankarKtheitroadala 提到的spark-testing-base。在这种情况下,上面的测试将如下所示:

val sparkContext: SparkContext = ...
val data: Seq[(String, String)] = Seq(("a", "1"), ("b", "2"), ("c", "3"))
val rdd: RDD[(String, String)] = sparkContext.parallelize(data)
val strings: mutable.Queue[RDD[(String, String)]] = mutable.Queue.empty[RDD[(String, String)]]
val streamingContext = new StreamingContext(sparkContext, Seconds(1))
val dStream: InputDStream = streamingContext.queueStream(strings)
strings += rdd

Note how I didn't have to do anything to deal with the SparkContext. SharedSparkContextgave me all that--with scas the SparkContext--for free. Personally though I wouldn't bring in this dependency for just this purpose since the Loan Pattern does exactly what I need for that. Also, with so much unpredictability that happens with distributed systems, it can be a real pain to have to trace through the magic that happens in the source code of a third-party library when things go wrong in continuous integration.

请注意,我无需做任何事情来处理SparkContext. SharedSparkContext给我所有的-有sc作为SparkContext-获得自由。就我个人而言,我不会仅仅为了这个目的引入这种依赖关系,因为贷款模式正是我所需要的。此外,由于分布式系统会发生如此多的不可预测性,因此当持续集成中出现问题时,不得不追踪第三方库源代码中发生的魔法可能会非常痛苦。

Now where spark-testing-basereally shines is with the Hadoop-based helpers like HDFSClusterLikeand YARNClusterLike. Mixing those traits in can really save you a lot of setup pain. Another place where it shines is with the Scalacheck-like properties and generators--assuming of course you understand how property-based testing works and why it is useful. But again, I would personally hold off on using it until my analytics and my tests reach that level of sophistication.

现在spark-testing-base真正闪耀的地方是基于 Hadoop 的帮助程序,如HDFSClusterLikeYARNClusterLike。将这些特征混合在一起可以真正为您节省很多设置的痛苦。它的另一个亮点是与Scalacheck 类似的属性和生成器——当然,假设您了解基于属性的测试是如何工作的以及它为什么有用。但同样,在我的分析和测试达到这种复杂程度之前,我个人会推迟使用它。

"Only a Sith deals in absolutes." -- Obi-Wan Kenobi

“只有西斯才能处理绝对值。” ——欧比旺·克诺比

Of course, you don't have to choose one or the other either. Perhaps you could use the Loan Pattern approach for most of your tests and spark-testing-baseonly for a few, more rigorous tests. The choice isn't binary; you can do both.

当然,您也不必选择其中之一。也许您可以将 Loan Pattern 方法用于大多数测试,而spark-testing-base仅用于少数更严格的测试。选择不是二元的;你可以两者都做。

Integration Testing with Spark Streaming

使用 Spark Streaming 进行集成测试

Finally, I would just like to present a snippet of what a SparkStreaming integration test setup with in-memory values might look like without spark-testing-base:

最后,我想展示一个带有内存值的 SparkStreaming 集成测试设置在没有spark-testing-base 的情况下可能是什么样子的片段:

class Tests extends FunSuite with BeforeAndAfterEach {

  var sparkSession : SparkSession = _
  override def beforeEach() {
    sparkSession = SparkSession.builder().appName("udf testings")
      .master("local")
      .config("", "")
      .getOrCreate()
  }

  test("your test name here"){
    //your unit test assert here like below
    assert("True".toLowerCase == "true")
  }

  override def afterEach() {
    sparkSession.stop()
  }
}

This is simpler than it looks. It really just turns a sequence of data into a queue to feed to the DStream. Most of it is really just boilerplate setup that works with the Spark APIs. Regardless, you can compare this with StreamingSuiteBaseas found inspark-testing-baseto decide which you prefer.

这比看起来简单。它实际上只是将数据序列转换为队列以提供给DStream. 其中大部分实际上只是与 Spark API 一起使用的样板设置。无论如何,你可以比较这StreamingSuiteBase如发现火花试验基地,以决定您喜欢哪一种。

This might be my longest post ever, so I will leave it here. I hope others chime in with other ideas to help improve the quality of our analytics with the same agile software engineering practices that have improved all other application development.

这可能是我有史以来最长的帖子,所以我会把它留在这里。我希望其他人加入其他想法,通过改进所有其他应用程序开发的相同敏捷软件工程实践来帮助提高我们的分析质量。

And with apologies for the shameless plug, you can check out our course Analytics with Apache Spark, where we address a lot of these ideas and more. We hope to have an online version soon.

为无耻的插件道歉,您可以查看我们的课程Apache Spark 分析,我们在其中讨论了很多这些想法等等。我们希望尽快有一个在线版本。

回答by ktheitroadalo

You can write a simple test with FunSuite and BeforeAndAfterEach like below

您可以使用 FunSuite 和 BeforeAndAfterEach 编写一个简单的测试,如下所示

test ("test name") {//implementation and assert}

You don't need to create a functions in test you can simply write as

你不需要在测试中创建一个函数,你可以简单地写成

class TestSharedSparkContext extends FunSuite with SharedSparkContext {

  val expectedResult = List(("a", 3),("b", 2),("c", 4))

  test("Word counts should be equal to expected") {
    verifyWordCount(Seq("c a a b a c b c c"))
  }

  def verifyWordCount(seq: Seq[String]): Unit = {
    assertResult(expectedResult)(new WordCount().transform(sc.makeRDD(seq)).collect().toList)
  }
}

Holden Karau has written really nice test spark-testing-base

Holden Karau 编写了非常好的测试spark-testing-base

You need to check out below is a simple example

你需要看看下面是一个简单的例子

import org.apache.spark.sql.SparkSession

trait SparkSessionTestWrapper {

  lazy val spark: SparkSession = {
    SparkSession.builder().master("local").appName("spark session").getOrCreate()
  }

}

Hope this helps!

希望这可以帮助!

回答by Powers

I like to create a SparkSessionTestWrappertrait that can be mixed in to test classes. Shankar's approach works, but it's prohibitively slow for test suites with multiple files.

我喜欢创建一个SparkSessionTestWrapper可以混合到测试类中的特性。Shankar 的方法有效,但对于具有多个文件的测试套件来说速度太慢了。

class DatasetSpec extends FunSpec with SparkSessionTestWrapper {

  import spark.implicits._

  describe("#count") {

    it("returns a count of all the rows in a DataFrame") {

      val sourceDF = Seq(
        ("jets"),
        ("barcelona")
      ).toDF("team")

      assert(sourceDF.count === 2)

    }

  }

}

The trait can be used as follows:

特性可以如下使用:

import com.github.mrpowers.spark.fast.tests.DatasetComparer

class DatasetSpec extends FunSpec with SparkSessionTestWrapper with DatasetComparer {

  import spark.implicits._

    it("aliases a DataFrame") {

      val sourceDF = Seq(
        ("jose"),
        ("li"),
        ("luisa")
      ).toDF("name")

      val actualDF = sourceDF.select(col("name").alias("student"))

      val expectedDF = Seq(
        ("jose"),
        ("li"),
        ("luisa")
      ).toDF("student")

      assertSmallDatasetEquality(actualDF, expectedDF)

    }

  }

}

Check the spark-specproject for a real-life example that uses the SparkSessionTestWrapperapproach.

查看spark-spec项目以获取使用该SparkSessionTestWrapper方法的真实示例。

Update

更新

The spark-testing-base libraryautomatically adds the SparkSession when certain traits are mixed in to the test class (e.g. when DataFrameSuiteBaseis mixed in, you'll have access to the SparkSession via the sparkvariable).

当某些特征混入到测试类中时,spark-testing-base 库会自动添加 SparkSession(例如,当DataFrameSuiteBase混入时,您将可以通过spark变量访问 SparkSession )。

I created a separate testing library called spark-fast-teststo give the users full control of the SparkSession when running their tests. I don't think a test helper library should set the SparkSession. Users should be able to start and stop their SparkSession as they see fit (I like to create one SparkSession and use it throughout the test suite run).

我创建了一个名为spark-fast-tests的单独测试库,以便用户在运行测试时完全控制 SparkSession。我认为测试助手库不应该设置 SparkSession。用户应该能够在他们认为合适的时候启动和停止他们的 SparkSession(我喜欢创建一个 SparkSession 并在整个测试套件运行中使用它)。

Here's an example of the spark-fast-tests assertSmallDatasetEqualitymethod in action:

下面是一个运行中的 spark-fast-testsassertSmallDatasetEquality方法的例子:

class YourAppTest extends SharedSQLContext {

  var app: YourApp = _

  protected override def beforeAll(): Unit = {
    super.beforeAll()

    app = new YourApp
  }

  protected override def afterAll(): Unit = {
    super.afterAll()
  }

  test("Your test") {
    val df = sqlContext.read.json("examples/src/main/resources/people.json")

    app.run(df)
  }

回答by Eugene Lopatkin

Since Spark 1.6you could use SharedSparkContextor SharedSQLContextthat Spark uses for its own unit tests:

Spark 1.6 开始,您可以使用SharedSparkContextSharedSQLContextSpark 用于自己的单元测试:

class YourAppTest extends SharedSparkSession {

  var app: YourApp = _

  protected override def beforeAll(): Unit = {
    super.beforeAll()

    app = new YourApp
  }

  protected override def afterAll(): Unit = {
    super.afterAll()
  }

  test("Your test") {
    df = spark.read.json("examples/src/main/resources/people.json")

    app.run(df)
  }

Since Spark 2.3SharedSparkSessionis available:

由于Spark 2.3SharedSparkSession可用:

<dependency>
  <groupId>org.scalactic</groupId>
  <artifactId>scalactic</artifactId>
  <version>SCALATEST_VERSION</version>
</dependency>
<dependency>
  <groupId>org.scalatest</groupId>
  <artifactId>scalatest</artifactId>
  <version>SCALATEST_VERSION</version>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core</artifactId>
  <version>SPARK_VERSION</version>
  <type>test-jar</type>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-sql</artifactId>
  <version>SPARK_VERSION</version>
  <type>test-jar</type>
  <scope>test</scope>
</dependency>

UPDATE:

更新:

Maven dependency:

Maven 依赖:

"org.scalactic" %% "scalactic" % SCALATEST_VERSION
"org.scalatest" %% "scalatest" % SCALATEST_VERSION % "test"
"org.apache.spark" %% "spark-core" % SPARK_VERSION % Test classifier "tests"
"org.apache.spark" %% "spark-sql" % SPARK_VERSION % Test classifier "tests"

SBT dependency:

SBT依赖:

class DataFrameTest extends FunSuite with DataFrameSuiteBase{
        test("test dataframe"){
        val sparkSession=spark
        import sparkSession.implicits._
        var df=sparkSession.read.format("csv").load("path/to/csv")
        //rest of the operations.
        }
        }

In addition, you could check test sourcesof Spark where there is a huge set of various test suits.

此外,您可以查看Spark 的测试源,其中有大量的各种测试套件。

UPDATE 2:

更新 2:

Apache Spark Unit Testing Part 1 — Core Components

Apache Spark 单元测试第 1 部分 — 核心组件

Apache Spark Unit Testing Part 2 — Spark SQL

Apache Spark 单元测试第 2 部分 — Spark SQL

Apache Spark Unit Testing Part 3 — Streaming

Apache Spark 单元测试第 3 部分 — 流

回答by sunitha

I could solve the problem with below code

我可以用下面的代码解决问题

spark-hive dependency is added in project pom

在项目 pom 中添加了 spark-hive 依赖项

import org.apache.spark.sql.SparkSession
import org.junit.Assert._
import org.junit.{After, Before, _}

@Test
class SessionSparkTest {
  var spark: SparkSession = _

  @Before
  def beforeFunction(): Unit = {
    //spark = SessionSpark.getSparkSession()
    spark = SparkSession.builder().appName("App Name").master("local").getOrCreate()
    System.out.println("Before Function")
  }

  @After
  def afterFunction(): Unit = {
    spark.stop()
    System.out.println("After Function")
  }

  @Test
  def testRddCount() = {
    val rdd = spark.sparkContext.parallelize(List(1, 2, 3))
    val count = rdd.count()
    assertTrue(3 == count)
  }

  @Test
  def testDfNotEmpty() = {
    val sqlContext = spark.sqlContext
    import sqlContext.implicits._
    val numDf = spark.sparkContext.parallelize(List(1, 2, 3)).toDF("nums")
    assertFalse(numDf.head(1).isEmpty)
  }

  @Test
  def testDfEmpty() = {
    val sqlContext = spark.sqlContext
    import sqlContext.implicits._
    val emptyDf = spark.sqlContext.createDataset(spark.sparkContext.emptyRDD[Num])
    assertTrue(emptyDf.head(1).isEmpty)
  }
}

case class Num(id: Int)

回答by Thirupathi Chavati

Another way to Unit Test using JUnit

使用 JUnit 进行单元测试的另一种方法

##代码##