java Kafka 消费者(0.8.2.2)可以批量读取消息吗

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/35623057/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-11-03 00:23:12  来源:igfitidea点击:

Can a Kafka consumer(0.8.2.2) read messages in batch

javaapache-kafkakafka-consumer-api

提问by shiv455

As per my understanding Kafka consumer reads messages from an assigned partition sequentially...

根据我的理解,Kafka 消费者按顺序从指定的分区读取消息......

We are planning to have multiple Kafka consumer (Java) which has same group I'd ..so if it reads sequentially from an assigned partition then how we can achieve high throughput ..i.e. For Example Producer publishes messages like 40 per sec ... Consumer process msg 1 per sec ..though we can have multiple consumers but cannot have 40 rt??? Correct me if I'm wrong...

我们计划拥有多个 Kafka 消费者(Java),它们具有相同的组我会..所以如果它从指定的分区顺序读取,那么我们如何实现高吞吐量..例如,生产者每秒发布 40 条消息.. . 消费者进程 msg 1 每秒 ..虽然我们可以有多个消费者但不能有 40 rt ??? 如我错了请纠正我...

And in our case consumer have to commit offset only after message is processed successfully ..else message will be reprocessed... Is there any better solution???

在我们的例子中,消费者只有在成功处理消息后才必须提交偏移量......否则消息将被重新处理......有没有更好的解决方案???

回答by Morgan Kenyon

Based on your question clarification.

根据您的问题澄清。

A Kafka Consumer can read multiple messages at a time. But a Kafka Consumer doesn't really read messages, its more correct to say a Consumer reads a certain number of bytes and then based on the size of the individual messages, that determines how many messages will be read. Reading through the Kafka Consumer Configs, you're not allowed to specify how many messages to fetch, you specify a max/min data size that a consumer can fetch. However many messages fit inside that range is how many you will get. You will always get messages sequentially as you have pointed out.

一个 Kafka 消费者可以一次读取多条消息。但是 Kafka 消费者并没有真正读取消息,更正确的说法是消费者读取一定数量的字节,然后根据单个消息的大小来确定将读取多少消息。阅读Kafka Consumer Configs,您不能指定要获取的消息数量,而是指定消费者可以获取的最大/最小数据大小。无论多少消息适合该范围,您都会收到多少消息。正如您所指出的那样,您将始终按顺序收到消息。

Related Consumer Configs (for 0.9.0.0 and greater)

相关消费者配置(适用于 0.9.0.0 及更高版本)

  • fetch.min.bytes
  • max.partition.fetch.bytes
  • fetch.min.bytes
  • max.partition.fetch.bytes

UPDATE

更新

Using your example in the comments, "my understanding is if i specify in config to read 10 bytes and if each message is 2 bytes the consumer reads 5 messages at a time." That is true. Your next statement, "that means the offsets of these 5 messages were random with in partition" that is false. Reading sequential doesn't mean one by one, it just means that they remain ordered. You are able to batch items and have them remain sequential/ordered. Take the following examples.

在评论中使用您的示例,“我的理解是,如果我在配置中指定读取 10 个字节,并且如果每条消息是 2 个字节,则消费者一次读取 5 条消息。” 那是真实的。您的下一个陈述,“这意味着这 5 条消息的偏移量在分区中是随机的”,这是错误的。顺序读取并不意味着一个一个,它只是意味着它们保持有序。您可以批量处理项目并使它们保持顺序/有序。举以下例子。

In a Kafka log, if there are 10 messages (each 2 bytes) with the following offsets, [0,1,2,3,4,5,6,7,8,9].

在 Kafka 日志中,如果有 10 条消息(每条 2 字节)具有以下偏移量,则为 [0,1,2,3,4,5,6,7,8,9]。

If you read 10 bytes, you'll get a batch containing the messages at offsets [0,1,2,3,4].

如果您读取 10 个字节,您将获得一批包含偏移量 [0,1,2,3,4] 处的消息。

If you read 6 bytes, you'll get a batch containing the messages at offsets [0,1,2].

如果您读取 6 个字节,您将获得一批包含偏移量 [0,1,2] 处的消息。

If you read 6 bytes, then another 6 bytes, you'll get two batches containing the messages [0,1,2] and [3,4,5].

如果您读取 6 个字节,然后再读取 6 个字节,您将得到两批包含消息 [0,1,2] 和 [3,4,5] 的批次。

If you read 8 bytes, then 4 bytes, you'll get two batches containing the messages [0,1,2,3] and [4,5].

如果您读取 8 个字节,然后读取 4 个字节,您将获得两批包含消息 [0,1,2,3] 和 [4,5] 的批次。

Update: Clarifying Committing

更新:澄清提交

I'm not 100% sure how committing works, I've mainly worked with Kafka from a Storm environment. The provided KafkaSpout automatically commits Kafka messages.

我不是 100% 确定提交是如何工作的,我主要在 Storm 环境中使用 Kafka。提供的 KafkaSpout 自动提交 Kafka 消息。

But looking through the 0.9.0.1 Consumer APIs, which I would recommend you do to. There seems to be three methods in particular that are relevant to this discussion.

但是查看0.9.0.1 Consumer APIs,我建议您这样做。似乎有三种方法与本次讨论特别相关。

  • poll(long timeout)
  • commitSync()
  • commitSync(java.util.Map offsets)
  • 轮询(长时间超时)
  • 提交同步()
  • commitSync(java.util.Map 偏移量)

The poll method retrieves messages, could be only 1, could be 20, for your example lets say 3 messages were returned [0,1,2]. You now have those three messages. Now it's up you to determine how to process them. You could process them 0 => 1 => 2, 1 => 0 => 2, 2 => 0 => 1, it just depends. However you process them, after processing you'll want to commit which tells the Kafka server you're done with those messages.

poll 方法检索消息,可能只有 1 个,也可能是 20 个,例如,假设返回了 3 个消息 [0,1,2]。您现在拥有这三个消息。现在由您决定如何处理它们。你可以处理它们 0 => 1 => 2, 1 => 0 => 2, 2 => 0 => 1,这取决于。不管你如何处理它们,在处理之后你会想要提交它告诉 Kafka 服务器你已经完成了这些消息。

Using the commitSync() commits everything returned on last poll, in this case it would commit offsets [0,1,2].

使用 commitSync() 提交上次轮询返回的所有内容,在这种情况下,它将提交偏移量 [0,1,2]。

On the other hand, if you choose to use commitSync(java.util.Map offsets), you can manually specify which offsets to commit. If you're processing them in order, you can process offset 0 then commit it, process offset 1 then commit it, finally process offset 2 and commit.

另一方面,如果您选择使用 commitSync(java.util.Map offsets),您可以手动指定要提交的偏移量。如果按顺序处理它们,则可以处理偏移量 0 然后提交,处理偏移量 1 然后提交,最后处理偏移量 2 并提交。

All in all, Kafka gives you the freedom to process messages how to desire, you can choose to process them sequentially or entirely random at your choosing.

总而言之,Kafka 使您可以自由地处理消息,您可以选择按顺序或完全随机地处理它们。

回答by Marko Bonaci

To achieve parallelism, which seems to be what you're asking, you use topic partitions (you split topic on N parts which are called partitions). Then, in the consumer, you spawn multiple threads to consume from those partitions.

为了实现并行性,这似乎是您要问的,您使用主题分区(您将主题拆分为 N 个称为分区的部分)。然后,在消费者中,您生成多个线程以从这些分区中消费。

On the Producer side, you publish messages to random partition (default) or you provide Kafka with some message attribute to calculate hash (if ordering is required), which makes sure that all msgs with the same hash go to the same partition.

在 Producer 端,您将消息发布到随机分区(默认)或者您为 Kafka 提供一些消息属性来计算哈希(如果需要排序),这确保所有具有相同哈希的消息都进入同一个分区。

EDIT (example of offset commit request):
This is how I did it. All methods that are not provided are non-essential.

编辑(偏移提交请求的示例):
我就是这样做的。所有未提供的方法都是非必要的。

 /**
   * Commits the provided offset for the current client (i.e. unique topic/partition/clientName combination)
   * 
   * @param offset
   * @return {@code true} or {@code false}, depending on whether commit succeeded
   * @throws Exception
   */
  public static boolean commitOffset(String topic, int partition, String clientName, SimpleConsumer consumer,
      long offset) throws Exception {
    try {
      TopicAndPartition tap = new TopicAndPartition(topic, partition);
      OffsetAndMetadata offsetMetaAndErr = new OffsetAndMetadata(offset, OffsetAndMetadata.NoMetadata(), -1L);
      Map<TopicAndPartition, OffsetAndMetadata> mapForCommitOffset = new HashMap<>(1);
      mapForCommitOffset.put(tap, offsetMetaAndErr);

      kafka.javaapi.OffsetCommitRequest offsetCommitReq = new kafka.javaapi.OffsetCommitRequest(
          ConsumerContext.getMainIndexingConsumerGroupId(), mapForCommitOffset, 1, clientName,
          ConsumerContext.getOffsetStorageType());

      OffsetCommitResponse offsetCommitResp = consumer.commitOffsets(offsetCommitReq);
      Short errCode = (Short) offsetCommitResp.errors().get(tap);
      if (errCode != 0) {
        processKafkaOffsetCommitError(tap, offsetCommitResp, BrokerInfo.of(consumer.host()));
        ErrorMapping.maybeThrowException(errCode);
      }
      LOG.debug("Successfully committed offset [{}].", offset);
    } catch (Exception e) {
      LOG.error("Error while committing offset [" + offset + "].", e);
      throw e;
    }
    return true;
  }

回答by charan teja

You can consume the messages in batches and process them in a batched manner. batch.max.wait.ms (property) the consumer will wait this amount of time and polls for new message

可以批量消费消息,批量处理。batch.max.wait.ms (property) 消费者将等待这段时间并轮询新消息