scala 为什么 Spark 应用程序失败并显示“ClassNotFoundException: Failed to find data source: kafka”作为带有 sbt 程序集的 uber-jar?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 
原文地址: http://stackoverflow.com/questions/41303037/
Warning: these are provided under cc-by-sa 4.0 license.  You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Why does Spark application fail with “ClassNotFoundException: Failed to find data source: kafka” as uber-jar with sbt assembly?
提问by benjguin
I'm trying to run a sample like StructuredKafkaWordCount. I started with the Spark Structured Streaming Programming guide.
我正在尝试运行像StructuredKafkaWordCount这样的示例。我从Spark Structured Streaming Programming guide 开始。
My code is
我的代码是
package io.boontadata.spark.job1
import org.apache.spark.sql.SparkSession
object DirectKafkaAggregateEvents {
  val FIELD_MESSAGE_ID = 0
  val FIELD_DEVICE_ID = 1
  val FIELD_TIMESTAMP = 2
  val FIELD_CATEGORY = 3
  val FIELD_MEASURE1 = 4
  val FIELD_MEASURE2 = 5
  def main(args: Array[String]) {
    if (args.length < 3) {
      System.err.println(s"""
        |Usage: DirectKafkaAggregateEvents <brokers> <subscribeType> <topics>
        |  <brokers> is a list of one or more Kafka brokers
        |  <subscribeType> sample value: subscribe
        |  <topics> is a list of one or more kafka topics to consume from
        |
        """.stripMargin)
      System.exit(1)
    }
    val Array(bootstrapServers, subscribeType, topics) = args
    val spark = SparkSession
      .builder
      .appName("boontadata-spark-job1")
      .getOrCreate()
    import spark.implicits._
    // Create DataSet representing the stream of input lines from kafka
    val lines = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServers)
      .option(subscribeType, topics)
      .load()
      .selectExpr("CAST(value AS STRING)")
      .as[String]
    // Generate running word count
    val wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count()
    // Start running the query that prints the running counts to the console
    val query = wordCounts.writeStream
      .outputMode("complete")
      .format("console")
      .start()
    query.awaitTermination()
  }
}
I added the following sbt files:
我添加了以下 sbt 文件:
build.sbt:
构建.sbt:
name := "boontadata-spark-job1"
version := "0.1"
scalaVersion := "2.11.7"
libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "2.0.2" % "provided"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.0.2" % "provided"
libraryDependencies += "org.apache.spark" % "spark-sql_2.11" % "2.0.2" % "provided"
libraryDependencies += "org.apache.spark" % "spark-sql-kafka-0-10_2.11" % "2.0.2"
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-10_2.11" % "2.0.2"
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.10.1.1"
libraryDependencies += "org.apache.kafka" % "kafka_2.11" % "0.10.1.1"
// META-INF discarding
assemblyMergeStrategy in assembly := { 
   {
    case PathList("META-INF", xs @ _*) => MergeStrategy.discard
    case x => MergeStrategy.first
   }
}
I also added project/assembly.sbt
我还添加了 project/assembly.sbt
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.3")
This creates a Uber jar with the non providedjars. 
这将创建一个带有非providedjar的 Uber jar 。
I submit with the following line:
我提交以下行:
spark-submit boontadata-spark-job1-assembly-0.1.jar ks1:9092,ks2:9092,ks3:9092 subscribe sampletopic
but I get this runtime error:
但我收到此运行时错误:
Exception in thread "main" java.lang.ClassNotFoundException: Failed to find data source: kafka. Please find packages at https://cwiki.apache.org/confluence/display/SPARK/Third+Party+Projects
        at org.apache.spark.sql.execution.datasources.DataSource.lookupDataSource(DataSource.scala:148)
        at org.apache.spark.sql.execution.datasources.DataSource.providingClass$lzycompute(DataSource.scala:79)
        at org.apache.spark.sql.execution.datasources.DataSource.providingClass(DataSource.scala:79)
        at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:218)
        at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:80)
        at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:80)
        at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30)
        at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:124)
        at io.boontadata.spark.job1.DirectKafkaAggregateEvents$.main(StreamingJob.scala:41)
        at io.boontadata.spark.job1.DirectKafkaAggregateEvents.main(StreamingJob.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:736)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain(SparkSubmit.scala:185)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: kafka.DefaultSource
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$$anonfun$apply.apply(DataSource.scala:132)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$$anonfun$apply.apply(DataSource.scala:132)
        at scala.util.Try$.apply(Try.scala:192)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun.apply(DataSource.scala:132)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun.apply(DataSource.scala:132)
        at scala.util.Try.orElse(Try.scala:84)
        at org.apache.spark.sql.execution.datasources.DataSource.lookupDataSource(DataSource.scala:132)
        ... 18 more
16/12/23 13:32:48 INFO spark.SparkContext: Invoking stop() from shutdown hook
Is there a way to know which class is not found so that I can search the maven.org repo for that class.
有没有办法知道找不到哪个类,以便我可以在 maven.org 存储库中搜索该类。
The lookupDataSourcesource code seems to be at line 543 at https://github.com/apache/spark/blob/83a6ace0d1be44f70e768348ae6688798c84343e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scalabut I couldn't find a direct link with Kafka data source...
的lookupDataSource源代码似乎是在在线543 https://github.com/apache/spark/blob/83a6ace0d1be44f70e768348ae6688798c84343e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource。 scala但我找不到与 Kafka 数据源的直接链接...
Complete source code is here: https://github.com/boontadata/boontadata-streams/tree/ad0d0134ddb7664d359c8dca40f1d16ddd94053f
完整的源代码在这里:https: //github.com/boontadata/boontadata-streams/tree/ad0d0134ddb7664d359c8dca40f1d16ddd94053f
采纳答案by Sree Eedupuganti
I tried like this it's working for me. Submit like this and let me know once you have any issues
我试过这样它对我有用。像这样提交,一旦您有任何问题,请告诉我
./spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0 --class com.inndata.StructuredStreaming.Kafka --master local[*] /Users/apple/.m2/repository/com/inndata/StructuredStreaming/0.0.1SNAPSHOT/StructuredStreaming-0.0.1-SNAPSHOT.jar
回答by Jacek Laskowski
The issue is the following section in build.sbt:
问题是以下部分build.sbt:
// META-INF discarding
assemblyMergeStrategy in assembly := { 
   {
    case PathList("META-INF", xs @ _*) => MergeStrategy.discard
    case x => MergeStrategy.first
   }
}
It says that all META-INFentires should be discarded, including the "code" that makes data source aliases (e.g. kafka) work.
它说所有的META-INF东西都应该被丢弃,包括使数据源别名(例如kafka)起作用的“代码” 。
But the META-INFfiles are very important for kafka(and other aliases of streaming data sources) to work.
但是这些META-INF文件对于kafka(和其他流数据源的别名)工作非常重要。
For kafkaalias to work Spark SQL uses META-INF/services/org.apache.spark.sql.sources.DataSourceRegisterwith the following entry:
为了让kafka别名工作 Spark SQL 使用META-INF/services/org.apache.spark.sql.sources.DataSourceRegister和以下条目:
org.apache.spark.sql.kafka010.KafkaSourceProvider
KafkaSourceProvideris responsible to registerkafkaalias with the proper streaming data source, i.e. KafkaSource.
KafkaSourceProvider负责向kafka正确的流数据源注册别名,即KafkaSource。
Just to check that the real code is indeed available, but the "code" that makes the alias registered is not, you could use the kafkadata source by the fully-qualified name (not the alias) as follows:
只是为了检查真正的代码确实可用,但使别名注册的“代码”不是,您可以使用kafka完全限定名称(而不是别名)的数据源,如下所示:
spark.readStream.
  format("org.apache.spark.sql.kafka010.KafkaSourceProvider").
  load
You will see other problems due to missing options like kafka.bootstrap.servers, but...we're digressing.
由于缺少选项kafka.bootstrap.servers,您会看到其他问题,例如,但是...我们离题了。
A solution is to MergeStrategy.concatall META-INF/services/org.apache.spark.sql.sources.DataSourceRegister(that would create an uber-jar with all data sources, incl. the kafkadata source).
一个解决方案是MergeStrategy.concat所有META-INF/services/org.apache.spark.sql.sources.DataSourceRegister(这将创建一个包含所有数据源的 uber-jar,包括kafka数据源)。
case "META-INF/services/org.apache.spark.sql.sources.DataSourceRegister" => MergeStrategy.concat
回答by ssice
In my case I also got this error while compiling with sbt, and the cause was that sbt assemblywas not including the spark-sql-kafka-0-10_2.11artifact as part of the fat jar.
在我的情况下,我在用 sbt 编译时也遇到了这个错误,原因sbt assembly是没有将spark-sql-kafka-0-10_2.11工件作为胖罐的一部分。
(I would be very welcome to comments here. The dependency was not specified a scope, so it should not be assumed to be "provided").
(我非常欢迎在这里发表评论。依赖项没有指定范围,所以不应该假设它是“提供”的)。
So I changed to deploying a normal (slim) jar and including the dependencies with the --jarsparameters to spark-submit.
所以我改为部署一个普通的(超薄)jar 并包含带有--jars参数的依赖项以进行 spark-submit。
In order to gather all dependencies in one place, you can add retrieveManaged := trueto your sbt project settings, or you can, in the sbt console, issue:
为了在一个地方收集所有依赖项,您可以添加retrieveManaged := true到您的 sbt 项目设置,或者您可以在 sbt 控制台中发出:
> set retrieveManaged := true
> package
That should bring all dependencies to the lib_managedfolder.
这应该将所有依赖项带到lib_managed文件夹。
Then you can copy all those files (with a bash command you can for example use something like this
然后你可以复制所有这些文件(使用 bash 命令,你可以使用这样的东西
cd /path/to/your/project
JARLIST=$(find lib_managed -name '*.jar'| paste -sd , -)
spark-submit [other-args] target/your-app-1.0-SNAPSHOT.jar --jars "$JARLIST"
回答by Algomeister
This is in view of Jacek Laskowski's answer.
这是考虑到 Jacek Laskowski 的回答。
Those of you building your project on maven can try this out. Add the line mentioned below to your maven-shade-plugin.
那些在 maven 上构建项目的人可以试试这个。将下面提到的行添加到您的 maven-shade-plugin 中。
META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
I've put down the plugin code for the pom file as an example to show where to add the line.
我已经放下了 pom 文件的插件代码作为示例来显示添加行的位置。
<plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-shade-plugin</artifactId>
    <version>3.1.0</version>
    <executions>
        <execution>
            <phase>package</phase>
            <goals>
                <goal>shade</goal>
            </goals>
            <configuration>
                <transformers>
                    <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                        <resource>
                            META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
                        </resource>
                    </transformer>
                </transformers>
                <finalName>${project.artifactId}-${project.version}-uber</finalName>
            </configuration>
        </execution>
    </executions>
</plugin>
Please excuse my formatting skills.
请原谅我的格式化技巧。
回答by dalin qin
I'm using spark 2.1 and facing the very same problem my workaround is
我正在使用 spark 2.1 并面临同样的问题,我的解决方法是
1)  spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0
1)  spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0
2) cd ~/.ivy2/jarshere you are ,all the needed jars are in this folder now
2)cd ~/.ivy2/jars你在这里,所有需要的罐子现在都在这个文件夹中
3) copy all the jars in this folder to all the nodes(can create a specific folder holding them)
3) 将这个文件夹中的所有 jars 复制到所有节点(可以创建一个包含它们的特定文件夹)
4) add the folder name  to spark.driver.extraClassPathand spark.driver.extraClassPath,e.g. spark.driver.extraClassPath=/opt/jars/*:your_other_jars
4)将文件夹名称添加到spark.driver.extraClassPath和spark.driver.extraClassPath,例如spark.driver.extraClassPath=/opt/jars/*:your_other_jars
5 spark-submit --class ClassNm --Other-Options YourJar.jarworks fine now
5spark-submit --class ClassNm --Other-Options YourJar.jar现在工作正常
回答by Falco Winkler
I am using gradle as a build tool and the shadowJar plugin to create the uberJar. The solution was simply to add a File
我使用 gradle 作为构建工具和 shadowJar 插件来创建 uberJar。解决方案只是添加一个文件
src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister  
to the project.
到项目。
In this file you need to put, line by line, the class names of the DataSources you use, in this case it would be org.apache.spark.sql.kafka010.KafkaSourceProvider(find that class name for example here)
在此文件中,您需要逐行放置您使用的数据源的类名,在本例中为org.apache.spark.sql.kafka010.KafkaSourceProvider(例如在此处找到该类名)
The reason is that Spark uses the java ServiceLoaderin it's internal dependency management mechanisms.
原因是 Spark在其内部依赖管理机制中使用了 java ServiceLoader。
Full example here.
完整示例在这里。
回答by Raghav
I solved it by downloading the jar file to the driver system. From there, I supplied the jar to spark submit with --jar option.
我通过将jar文件下载到驱动系统来解决它。从那里,我提供了 jar 以使用 --jar 选项触发提交。
Also to be noted is that i was packaging the whole spark 2.1 environment in my uber jar (since my cluster is still on 1.6.1) For some reason, its not picked up when included in uber jar.
还要注意的是,我将整个 spark 2.1 环境打包在我的 uber jar 中(因为我的集群仍然在 1.6.1 上)由于某种原因,当它包含在 uber jar 中时没有被选中。
spark-submit --jar /ur/path/spark-sql-kafka-0-10_2.11:2.1.0 --class ClassNm --Other-Options YourJar.jar
spark-submit --jar /ur/path/spark-sql-kafka-0-10_2.11:2.1.0 --class ClassNm --Other-Options YourJar.jar

