java Spark Streaming 中的 Kafka 消费者

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/26725463/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-11-02 10:29:36  来源:igfitidea点击:

Kafka consumer in Spark Streaming

javaapache-sparkapache-zookeeperapache-kafkaspark-streaming

提问by DilTeam

Trying to write a Spark Streaming job that consumes messages from Kafka. Here's what I've so far:

尝试编写一个使用来自 Kafka 的消息的 Spark Streaming 作业。这是我到目前为止的内容:

1) Started Zookeeper.
2) Started Kafka Server.
3) Sent a few messages to the server. I can see them when I run the following:

1)启动Zookeeper。
2)启动Kafka服务器。
3)向服务器发送了一些消息。当我运行以下命令时,我可以看到它们:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic mytopic --from-beginning

bin/kafka-console-consumer.sh --zookeeper 本地主机:2181 --topic mytopic --from-beginning

4) Now trying to write a program to count # of messages coming in within 5 minutes.

4) 现在尝试编写一个程序来计算 5 分钟内传入的消息数量。

The code looks something like this:

代码如下所示:

    Map<String, Integer> map = new HashMap<String, Integer>();
    map.put("mytopic", new Integer(1));

    JavaStreamingContext ssc = new JavaStreamingContext(
            sparkUrl, " Spark Streaming", new Duration(60 * 5 * 1000), sparkHome, new String[]{jarFile});


    JavaPairReceiverInputDStream tweets = KafkaUtils.createStream(ssc, "localhost:2181", "1", map);

Not sure what value to use for the 3rd argument (consumer group). When I run this I get “Unable to connect to zookeeper server”. But Zookeeper is running on port 2181; otherwise step #3 would not have worked.

不确定第三个参数(消费者组)使用什么值。当我运行它时,我得到“无法连接到zookeeper服务器”。但是 Zookeeper 运行在 2181 端口;否则第 3 步将不会奏效。

Seems like I am not using KafkaUtils.createStream properly. Any ideas?

好像我没有正确使用 KafkaUtils.createStream 。有任何想法吗?

回答by Denis Makarenko

There is no such thing as default consumer group. You can use an arbitrary non-empty string there. If you have only one consumer, its consumer group doesn't really matter. If there are two or more consumers, they can either be a part of the same consumer group or belong to different consumer groups.

没有默认消费者组这样的东西。您可以在那里使用任意的非空字符串。如果你只有一个消费者,那么它的消费者群体并不重要。如果有两个或多个消费者,他们可以是同一个消费者组的一部分,也可以属于不同的消费者组。

From http://kafka.apache.org/documentation.html:

来自http://kafka.apache.org/documentation.html

Consumers

消费者

...

...

If all the consumer instances have the same consumer group, then this works just like a traditional queue balancing load over the consumers.

If all the consumer instances have different consumer groups, then this works like publish-subscribe and all messages are broadcast to all consumers.

如果所有消费者实例都具有相同的消费者组,那么这就像传统的队列平衡消费者负载一样。

如果所有的消费者实例都有不同的消费者组,那么这就像发布订阅一样,所有的消息都会广播给所有的消费者。

I think the problem may be in 'topics' parameter. From Spark docs:

我认为问题可能出在“主题”参数中。来自Spark 文档

Map of (topic_name -> numPartitions) to consume. Each partition is consumed in its own thread

要消费的 (topic_name -> numPartitions) 映射。每个分区都在自己的线程中使用

You only specified a single partition for your topic, namely '1'. Depending on broker's setting (num.partitions), there may be more than one partitions and your messages may be sent to other partitions which aren't read by your program.

您只为您的主题指定了一个分区,即“1”。根据代理的设置 (num.partitions),可能有多个分区,并且您的消息可能会发送到程序未读取的其他分区。

Besides, I believe the partitionIds are 0 based. So if you have only one partition, it will have the id equal to 0.

此外,我相信 partitionIds 是基于 0 的。所以如果你只有一个分区,它的 id 将等于 0。

回答by BG GGN

I think you should specify the ip for zookeeper instead of localhost. Also, the third argument is for consumer group name. It can be any name you like. It is for the time when you have more than one consumer tied to the same group,topic partitions are distributed accordingly.Your tweets should be:

我认为您应该为 zookeeper 而不是 localhost 指定 ip。此外,第三个参数是消费者组名称。它可以是您喜欢的任何名称。当你有多个消费者绑定到同一组时,主题分区会相应地分配。你的推文应该是:

JavaPairReceiverInputDStream tweets = KafkaUtils.createStream(ssc, "x.x.x.x", "dummy-group", map);

回答by Knight71

I was facing the same issue. Here is the solution that worked for me.

我面临同样的问题。这是对我有用的解决方案。

  • The number of cores allocated to the Spark Streaming application must be more than the number of receivers. Otherwise the system will receive data, but not be able to process it.So Spark Streaming requires minimum of two cores . So in my spark-submit, I should mention at-least two cores.
  • kafka-clients-version.jar should be included in the list of dependent jars in spark-submit.
  • 分配给 Spark Streaming 应用程序的内核数必须大于接收器数。否则系统将接收数据,但无法处理它。因此 Spark Streaming 至少需要两个核心。所以在我的 spark-submit 中,我应该提到至少两个核心。
  • kafka-clients-version.jar 应包含在 spark-submit 中的依赖 jar 列表中。

回答by swatisinghi

If zookeeper is running on the same machine as your streaming application then "localhost:2181" will work. Otherwise, you have to mention the address of the host where zookeeper is running and ensure that machine on which streaming app is running is able to talk to zookeeper host on port 2181.

如果 zookeeper 与您的流媒体应用程序在同一台机器上运行,则“localhost:2181”将起作用。否则,您必须提及运行 zookeeper 的主机地址,并确保运行流应用程序的机器能够在端口 2181 上与 zookeeper 主机通信。

回答by user553182

I think, in your code, the second argument for the call KafkaUtils.createStream, should be the host:port of the kafka server, not the zookeeper host and port. check that once.

我认为,在您的代码中,调用 KafkaUtils.createStream 的第二个参数应该是 kafka 服务器的主机:端口,而不是 zookeeper 主机和端口。检查一次。

EDIT: Kafka Utils API Documentation

编辑: Kafka Utils API 文档

As per the document above, it should be the zookeeper quorum . So Zookeeper hostname and port should be used.

根据上面的文档,它应该是 zookeeper quorum 。所以应该使用 Zookeeper 主机名和端口。

zkQuorum Zookeeper quorum (hostname:port,hostname:port,..).

zkQuorum Zookeeper 仲裁(主机名:端口,主机名:端口,..)。