Java 避免apache kafka消费者中重复消息的有效策略

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/29647656/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-11 08:22:18  来源:igfitidea点击:

Effective strategy to avoid duplicate messages in apache kafka consumer

javamessage-queueapache-kafka

提问by Shades88

I have been studying apache kafka for a month now. I am however, stuck at a point now. My use case is, I have two or more consumer processes running on different machines. I ran a few tests in which I published 10,000 messages in kafka server. Then while processing these messages I killed one of the consumer processes and restarted it. Consumers were writing processed messages in a file. So after consumption finished, file was showing more than 10k messages. So some messages were duplicated.

我已经学习 apache kafka 一个月了。然而,我现在陷入了困境。我的用例是,我有两个或多个消费者进程在不同的机器上运行。我运行了一些测试,其中我在 kafka 服务器中发布了 10,000 条消息。然后在处理这些消息时,我杀死了一个消费者进程并重新启动它。消费者正在将处理过的消息写入文件。所以消费完成后,文件显示超过10k条消息。所以有些消息是重复的。

In consumer process I have disabled auto commit. Consumers manually commit offsets batch wise. So for e.g if 100 messages are written to file, consumer commits offsets. When single consumer process is running and it crashes and recovers duplication is avoided in this manner. But when more than one consumers are running and one of them crashes and recovers, it writes duplicate messages to file.

在消费者过程中,我禁用了自动提交。消费者手动批量提交偏移量。因此,例如,如果将 100 条消息写入文件,则消费者提交偏移量。当单个消费者进程正在运行并且它崩溃并恢复时,以这种方式避免了重复。但是当不止一个消费者正在运行并且其中一个崩溃并恢复时,它会将重复的消息写入文件。

Is there any effective strategy to avoid these duplicate messages?

是否有任何有效的策略来避免这些重复消息?

采纳答案by kuujo

The short answer is, no.

最简洁的答案是不。

What you're looking for is exactly-once processing. While it may often seem feasible, it should never be relied upon because there are always caveats.

您正在寻找的是完全一次处理。虽然它通常看起来可行,但永远不应该依赖它,因为总是有警告。

Even in order to attempt to prevent duplicates you would need to use the simple consumer. How this approach works is for each consumer, when a message is consumed from some partition, write the partition and offset of the consumed message to disk. When the consumer restarts after a failure, read the last consumed offset for each partition from disk.

即使为了尝试防止重复,您也需要使用简单的使用者。这种方法的工作原理是对于每个消费者,当从某个分区消费消息时,将消费消息的分区和偏移量写入磁盘。当消费者在失败后重新启动时,从磁盘读取每个分区的最后消耗的偏移量。

But even with this pattern the consumer can't guarantee it won't reprocess a message after a failure. What if the consumer consumes a message and then fails before the offset is flushed to disk? If you write to disk before you process the message, what if you write the offset and then fail before actually processing the message? This same problem would exist even if you were to commit offsets to ZooKeeper after every message.

但即使使用这种模式,消费者也不能保证它不会在失败后重新处理消息。如果消费者消费了一条消息,然后在偏移量刷新到磁盘之前失败了怎么办?如果在处理消息之前写入磁盘,如果在实际处理消息之前写入偏移量然后失败怎么办?即使您要在每条消息之后向 ZooKeeper 提交偏移量,也会存在同样的问题。

There are some cases, though, where exactly-once processing is more attainable, but only for certain use cases. This simply requires that your offset be stored in the same location as unit application's output. For instance, if you write a consumer that counts messages, by storing the last counted offset with each count you can guarantee that the offset is stored at the same time as the consumer's state. Of course, in order to guarantee exactly-once processing this would require that you consume exactly one message and update the state exactly once for each message, and that's completely impractical for most Kafka consumer applications. By its nature Kafka consumes messages in batches for performance reasons.

但是,在某些情况下,仅一次处理更容易实现,但仅限于某些用例。这只是要求您的偏移量与单元应用程序的输出存储在同一位置。例如,如果您编写一个对消息进行计数的消费者,通过存储每个计数的最后计数的偏移量,您可以保证该偏移量与消费者的状态同时存储。当然,为了保证只处理一次,这将要求您只消费一条消息并为每条消息只更新一次状态,这对于大多数 Kafka 消费者应用程序来说是完全不切实际的。出于性能原因,Kafka 分批消费消息。

Usually your time will be more well spent and your application will be much more reliable if you simply design it to be idempotent.

通常,如果您只是将其设计为幂等的,那么您的时间会花得更多,并且您的应用程序将更加可靠。

回答by RaGe

This is what Kafka FAQhas to say on the subject of exactly-once:

这就是Kafka 常见问题解答关于恰好一次的主题:

How do I get exactly-once messaging from Kafka?

Exactly once semantics has two parts: avoiding duplication during data production and avoiding duplicates during data consumption.

There are two approaches to getting exactly once semantics during data production:

  • Use a single-writer per partition and every time you get a network error check the last message in that partition to see if your last write succeeded
  • Include a primary key (UUID or something) in the message and deduplicate on the consumer.

If you do one of these things, the log that Kafka hosts will be duplicate-free. However, reading without duplicates depends on some co-operation from the consumer too. If the consumer is periodically checkpointing its position then if it fails and restarts it will restart from the checkpointed position. Thus if the data output and the checkpoint are not written atomically it will be possible to get duplicates here as well. This problem is particular to your storage system. For example, if you are using a database you could commit these together in a transaction. The HDFS loader Camus that LinkedIn wrote does something like this for Hadoop loads. The other alternative that doesn't require a transaction is to store the offset with the data loaded and deduplicate using the topic/partition/offset combination.

I think there are two improvements that would make this a lot easier:

  • Producer idempotence could be done automatically and much more cheaply by optionally integrating support for this on the server.
  • The existing high-level consumer doesn't expose a lot of the more fine grained control of offsets (e.g. to reset your position). We will be working on that soon

我如何从 Kafka 获得只一次的消息?

Exactly once 语义有两个部分:避免数据生产过程中的重复和避免数据消费过程中的重复。

有两种方法可以在数据生产期间获得恰好一次语义:

  • 每个分区使用单个写入器,每次出现网络错误时,请检查该分区中的最后一条消息以查看上次写入是否成功
  • 在消息中包含一个主键(UUID 或其他东西)并对消费者进行重复数据删除。

如果您执行其中一项操作,Kafka 托管的日志将是无重复的。然而,没有重复的阅读也取决于消费者的一些合作。如果消费者定期检查其位置,那么如果它失败并重新启动,它将从检查点位置重新启动。因此,如果数据输出和检查点不是以原子方式写入的,那么这里也有可能获得重复项。此问题是您的存储系统所特有的。例如,如果您正在使用一个数据库,您可以在一个事务中一起提交这些。LinkedIn 编写的 HDFS 加载器 Camus 对 Hadoop 加载执行类似的操作。另一种不需要事务的替代方法是使用主题/分区/偏移量组合来存储带有加载和重复数据删除的偏移量。

我认为有两个改进可以使这更容易:

  • 生产者幂等可以通过可选地在服务器上集成对此的支持来自动完成,并且成本更低。
  • 现有的高级消费者并没有公开许多更细粒度的偏移控制(例如重置您的位置)。我们很快就会着手解决这个问题

回答by peihan

I agree with RaGe's deduplicate on the consumer side. And we use Redis to deduplicate Kafka message.

我同意 RaGe 在消费者方面的重复数据删除。我们使用 Redis 对 Kafka 消息进行重复数据删除。

Assume the Message class has a member called 'uniqId', which is filled by the producer side and is guaranteed to be unique. We use a 12 length random string. (regexp is '^[A-Za-z0-9]{12}$')

假设Message类有一个名为'uniqId'的成员,由生产者端填充,保证唯一。我们使用一个 12 长度的随机字符串。(正则表达式是'^[A-Za-z0-9]{12}$'

The consumer side use Redis's SETNX to deduplicate and EXPIRE to purge expired keys automatically. Sample code:

消费者端使用 Redis 的 SETNX 进行重复数据删除和 EXPIRE 自动清除过期的键。示例代码:

Message msg = ... // eg. ConsumerIterator.next().message().fromJson();
Jedis jedis = ... // eg. JedisPool.getResource();
String key = "SPOUT:" + msg.uniqId; // prefix name at will
String val = Long.toString(System.currentTimeMillis());
long rsps = jedis.setnx(key, val);
if (rsps <= 0) {
    log.warn("kafka dup: {}", msg.toJson()); // and other logic
} else {
    jedis.expire(key, 7200); // 2 hours is ok for production environment;
}

The above code did detect duplicate messages several times when Kafka(version 0.8.x) had situations. With our input/output balance audit log, no message lost or dup happened.

当Kafka(版本0.8.x)出现情况时,上述代码确实多次检测到重复消息。使用我们的输入/输出平衡审计日志,没有消息丢失或重复发生。

回答by Dean Jain

Whatever done on producer side, still the best way we believe to deliver exactly once from kafka is to handle it on consumer side:

无论在生产者方面做了什么,我们认为从 kafka 只交付一次的最佳方式仍然是在消费者方面处理它:

  1. Produce msg with a uuid as the Kafka message Key into topic T1
  2. consumer side read the msg from T1, write it on hbase with uuid as rowkey
  3. read back from hbase with the same rowkey and write to another topic T2
  4. have your end consumers actually consume from topic T2
  1. 生成带有 uuid 的 msg 作为 Kafka 消息 Key 进入主题 T1
  2. 消费者端从 T1 读取 msg,以 uuid 作为 rowkey 将其写入 hbase
  3. 使用相同的行键从 hbase 读回并写入另一个主题 T2
  4. 让您的最终消费者实际从主题 T2 消费

回答by Chris Halcrow

There's a relatively new 'Transactional API' now in Kafka that can allow you to achieve exactly once processing when processing a stream. With the transactional API, idempotency can be built in, as long as the remainder of your system is designed for idempotency. See https://www.baeldung.com/kafka-exactly-once

现在 Kafka 中有一个相对较新的“事务 API”,它可以让您在处理流时实现一次处理。使用事务 API,可以内置幂等性,只要您的系统的其余部分是为幂等性设计的。见https://www.baeldung.com/kafka-exactly-once