scala Spark Streaming - 读写 Kafka 主题

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/31590592/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 07:23:25  来源:igfitidea点击:

Spark Streaming - read and write on Kafka topic

scalaapache-sparkapache-kafkaspark-streamingspark-streaming-kafka

提问by Chobeat

I am using Spark Streaming to process data between two Kafka queues but I can not seem to find a good way to write on Kafka from Spark. I have tried this:

我正在使用 Spark Streaming 来处理两个 Kafka 队列之间的数据,但我似乎找不到从 Spark 写入 Kafka 的好方法。我试过这个:

input.foreachRDD(rdd =>
  rdd.foreachPartition(partition =>
    partition.foreach {
      case x: String => {
        val props = new HashMap[String, Object]()

        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
          "org.apache.kafka.common.serialization.StringSerializer")
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
          "org.apache.kafka.common.serialization.StringSerializer")

        println(x)
        val producer = new KafkaProducer[String, String](props)
        val message = new ProducerRecord[String, String]("output", null, x)
        producer.send(message)
      }
    }
  )
)

and it works as intended but instancing a new KafkaProducer for every message is clearly unfeasible in a real context and I'm trying to work around it.

它按预期工作,但在真实环境中为每条消息实例化一个新的 KafkaProducer 显然是不可行的,我正在尝试解决它。

I would like to keep a reference to a single instance for every process and access it when I need to send a message. How can I write to Kafka from Spark Streaming?

我想为每个进程保留对单个实例的引用,并在需要发送消息时访问它。如何从 Spark Streaming 写入 Kafka?

采纳答案by Marius Soutier

My first advice would be to try to create a new instance in foreachPartition and measure if that is fast enough for your needs (instantiating heavy objects in foreachPartition is what the official documentation suggests).

我的第一个建议是尝试在 foreachPartition 中创建一个新实例并测量它是否足够快满足您的需要(在 foreachPartition 中实例化重对象是官方文档建议的)。

Another option is to use an object pool as illustrated in this example:

另一种选择是使用对象池,如下例所示:

https://github.com/miguno/kafka-storm-starter/blob/develop/src/main/scala/com/miguno/kafkastorm/kafka/PooledKafkaProducerAppFactory.scala

https://github.com/miguno/kafka-storm-starter/blob/develop/src/main/scala/com/miguno/kafkastorm/kafka/PooledKafkaProducerAppFactory.scala

I however found it hard to implement when using checkpointing.

然而,我发现在使用检查点时很难实现。

Another version that is working well for me is a factory as described in the following blog post, you just have to check if it provides enough parallelism for your needs (check the comments section):

另一个对我来说效果很好的版本是一个工厂,如以下博客文章中所述,您只需要检查它是否提供了足够的并行性以满足您的需求(查看评论部分):

http://allegro.tech/2015/08/spark-kafka-integration.html

http://allegro.tech/2015/08/spark-kafka-integration.html

回答by Michael G. Noll

Yes, unfortunately Spark (1.x, 2.x) doesn't make it straight-forward how to write to Kafka in an efficient manner.

是的,不幸的是 Spark (1.x, 2.x) 并没有直接说明如何以有效的方式写入 Kafka。

I'd suggest the following approach:

我建议采用以下方法:

  • Use (and re-use) one KafkaProducerinstance per executor process/JVM.
  • KafkaProducer每个执行程序进程/JVM使用(并重复使用)一个实例。

Here's the high-level setup for this approach:

这是此方法的高级设置:

  1. First, you must "wrap" Kafka's KafkaProducerbecause, as you mentioned, it is not serializable. Wrapping it allows you to "ship" it to the executors. The key idea here is to use a lazy valso that you delay instantiating the producer until its first use, which is effectively a workaround so that you don't need to worry about KafkaProducernot being serializable.
  2. You "ship" the wrapped producer to each executor by using a broadcast variable.
  3. Within your actual processing logic, you access the wrapped producer through the broadcast variable, and use it to write processing results back to Kafka.
  1. 首先,您必须“包装”Kafka,KafkaProducer因为正如您所提到的,它不可序列化。包装它允许您将它“发送”给执行者。这里的关键思想是使用 alazy val以便您将生产者的实例化延迟到第一次使用,这实际上是一种解决方法,因此您无需担心KafkaProducer无法序列化。
  2. 您可以使用广播变量将包装好的生产者“运送”给每个执行者。
  3. 在您的实际处理逻辑中,您通过广播变量访问包装的生产者,并使用它将处理结果写回 Kafka。

The code snippets below work with Spark Streaming as of Spark 2.0.

从 Spark 2.0 开始,下面的代码片段适用于 Spark Streaming。

Step 1: Wrapping KafkaProducer

第 1 步:包装 KafkaProducer

import java.util.concurrent.Future

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}

class MySparkKafkaProducer[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable {

  /* This is the key idea that allows us to work around running into
     NotSerializableExceptions. */
  lazy val producer = createProducer()

  def send(topic: String, key: K, value: V): Future[RecordMetadata] =
    producer.send(new ProducerRecord[K, V](topic, key, value))

  def send(topic: String, value: V): Future[RecordMetadata] =
    producer.send(new ProducerRecord[K, V](topic, value))

}

object MySparkKafkaProducer {

  import scala.collection.JavaConversions._

  def apply[K, V](config: Map[String, Object]): MySparkKafkaProducer[K, V] = {
    val createProducerFunc = () => {
      val producer = new KafkaProducer[K, V](config)

      sys.addShutdownHook {
        // Ensure that, on executor JVM shutdown, the Kafka producer sends
        // any buffered messages to Kafka before shutting down.
        producer.close()
      }

      producer
    }
    new MySparkKafkaProducer(createProducerFunc)
  }

  def apply[K, V](config: java.util.Properties): MySparkKafkaProducer[K, V] = apply(config.toMap)

}

Step 2: Use a broadcast variable to give each executor its own wrapped KafkaProducerinstance

第 2 步:使用广播变量为每个执行程序提供自己的包装KafkaProducer实例

import org.apache.kafka.clients.producer.ProducerConfig

val ssc: StreamingContext = {
  val sparkConf = new SparkConf().setAppName("spark-streaming-kafka-example").setMaster("local[2]")
  new StreamingContext(sparkConf, Seconds(1))
}

ssc.checkpoint("checkpoint-directory")

val kafkaProducer: Broadcast[MySparkKafkaProducer[Array[Byte], String]] = {
  val kafkaProducerConfig = {
    val p = new Properties()
    p.setProperty("bootstrap.servers", "broker1:9092")
    p.setProperty("key.serializer", classOf[ByteArraySerializer].getName)
    p.setProperty("value.serializer", classOf[StringSerializer].getName)
    p
  }
  ssc.sparkContext.broadcast(MySparkKafkaProducer[Array[Byte], String](kafkaProducerConfig))
}

Step 3: Write from Spark Streaming to Kafka, re-using the same wrapped KafkaProducerinstance (for each executor)

第 3 步:从 Spark Streaming 写入 Kafka,重用相同的包装KafkaProducer实例(对于每个执行程序)

import java.util.concurrent.Future
import org.apache.kafka.clients.producer.RecordMetadata

val stream: DStream[String] = ???
stream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val metadata: Stream[Future[RecordMetadata]] = partitionOfRecords.map { record =>
      kafkaProducer.value.send("my-output-topic", record)
    }.toStream
    metadata.foreach { metadata => metadata.get() }
  }
}

Hope this helps.

希望这可以帮助。

回答by mrsrinivas

With Spark >= 2.2

Both read and write operations are possibleon Kafka using Structured Streaming API

使用 Spark >= 2.2

无论读写操作都是可能的卡夫卡使用结构化流API

Build stream from Kafka topic

从 Kafka 主题构建流

// Subscribe to a topic and read messages from the earliest to latest offsets
val ds= spark
  .readStream // use `read` for batch, like DataFrame
  .format("kafka")
  .option("kafka.bootstrap.servers", "brokerhost1:port1,brokerhost2:port2")
  .option("subscribe", "source-topic1")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()

Read the key and value and apply the schema for both, for simplicity we are making converting both of them to Stringtype.

读取键和值并为两者应用模式,为简单起见,我们将它们都转换为String类型。

val dsStruc = ds.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

Since dsStruchave the schema, it's accepts all SQL kind operations like filter, agg, select..etc on it.

既然dsStruc有架构,它就接受所有 SQL 类型的操作,如filter, agg, select..etc 。

Write stream to Kafka topic

将流写入 Kafka 主题

dsStruc
  .writeStream // use `write` for batch, like DataFrame
  .format("kafka")
  .option("kafka.bootstrap.servers", "brokerhost1:port1,brokerhost2:port2")
  .option("topic", "target-topic1")
  .start()

More configuration for Kafka integration to read or write

Kafka集成读取或写入的更多配置

Key artifacts to add in the application

要添加到应用程序中的关键工件

 "org.apache.spark" % "spark-core_2.11" % 2.2.0,
 "org.apache.spark" % "spark-streaming_2.11" % 2.2.0,
 "org.apache.spark" % "spark-sql-kafka-0-10_2.11" % 2.2.0,

回答by gcaliari

I was having the same issue and found this post.

我遇到了同样的问题并找到了这篇文章

The author solves the problem by creating 1 producer per executor. Instead of sending the producer itself, he sends only a “recipe” how to create a producer in an executor by broadcasting it.

作者通过为每个执行者创建 1 个生产者来解决该问题。他没有发送生产者本身,而是仅通过广播发送如何在执行器中创建生产者的“食谱”。

    val kafkaSink = sparkContext.broadcast(KafkaSink(conf))

He uses a wrapper that lazily creates the producer:

他使用了一个懒惰地创建生产者的包装器:

    class KafkaSink(createProducer: () => KafkaProducer[String, String]) extends Serializable {

      lazy val producer = createProducer()

      def send(topic: String, value: String): Unit = producer.send(new     ProducerRecord(topic, value))
    }


    object KafkaSink {
      def apply(config: Map[String, Object]): KafkaSink = {
        val f = () => {
          val producer = new KafkaProducer[String, String](config)

          sys.addShutdownHook {
            producer.close()
          }

          producer
        }
        new KafkaSink(f)
      }
    }

The wrapper is serializable because the Kafka producer is initialized just before first use on an executor. The driver keeps the reference to the wrapper and the wrapper sends the messages using each executor's producer:

包装器是可序列化的,因为 Kafka 生产者在第一次在执行器上使用之前被初始化。驱动程序保留对包装器的引用,包装器使用每个执行器的生产者发送消息:

    dstream.foreachRDD { rdd =>
      rdd.foreach { message =>
        kafkaSink.value.send("topicName", message)
      }
    }

回答by maasg

There is a Streaming Kafka Writer maintained by Cloudera (actually spun off from a Spark JIRA [1]). It basically creates a producer per partition, which amortizes the time spent to create 'heavy' objects over a (hopefully large) collection of elements.

Cloudera 维护了一个 Streaming Kafka Writer(实际上是从 Spark JIRA [1] 中分离出来的)。它基本上为每个分区创建一个生产者,它分摊了在(希望是大的)元素集合上创建“重”对象所花费的时间。

The Writer can be found here: https://github.com/cloudera/spark-kafka-writer

Writer 可以在这里找到:https: //github.com/cloudera/spark-kafka-writer

回答by lmm

Why is it infeasible? Fundamentally each partition of each RDD is going to run independently (and may well run on a different cluster node), so you haveto redo the connection (and any synchronization) at the start of each partition's task. If the overhead of that is too high then you should increase the batch size in your StreamingContextuntil it becomes acceptable (obv. there's a latency cost to doing this).

为什么不可行?从根本上说,每个 RDD 的每个分区都将独立运行(并且很可能运行在不同的集群节点上),因此您必须在每个分区的任务开始时重做连接(和任何同步)。如果它的开销太高,那么您应该增加批处理大小,StreamingContext直到它变得可以接受为止(显然,这样做会产生延迟成本)。

(If you're not handling thousands of messages in each partition, are you sure you need spark-streaming at all? Would you do better with a standalone application?)

(如果您没有在每个分区中处理数千条消息,您确定需要火花流吗?使用独立应用程序会更好吗?)

回答by sainath reddy

This might be what you want to do. You basically create one producer for each partition of records.

这可能是您想要做的。您基本上为每个记录分区创建一个生产者。

input.foreachRDD(rdd =>
      rdd.foreachPartition(
          partitionOfRecords =>
            {
                val props = new HashMap[String, Object]()
                props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
                props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                  "org.apache.kafka.common.serialization.StringSerializer")
                props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                  "org.apache.kafka.common.serialization.StringSerializer")
                val producer = new KafkaProducer[String,String](props)

                partitionOfRecords.foreach
                {
                    case x:String=>{
                        println(x)

                        val message=new ProducerRecord[String, String]("output",null,x)
                        producer.send(message)
                    }
                }
          })
) 

Hope that helps

希望有帮助