java 是否可以使用java将kafka消费者收到的输出写入文件

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/33606287/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-11-02 21:54:44  来源:igfitidea点击:

Is it possible to write kafka consumer received output to a file using java

javahadoopapache-kafkaconsumerproducer

提问by pratik dhamanekar

I have a Kafka Producer code written in java that writes kafka messages. And a consumer code that receives these messages.

我有一个用 java 编写的 Kafka Producer 代码,用于编写 kafka 消息。以及接收这些消息的消费者代码。

Is it possible to write theses received messages by consumer to any text file in java.

是否可以将消费者收到的这些消息写入java中的任何文本文件。

回答by ramblingpolak

If you're writing your own consumer you should include the logic to write to file in the same application. Using the prepackaged console consumer you could just pipe it to a file. For example: kafka-console-consumer > file.txt

如果您正在编写自己的使用者,则应在同一应用程序中包含写入文件的逻辑。使用预先打包的控制台使用者,您可以将其通过管道传输到文件。例如:kafka-console-consumer > file.txt

Another (code-free) option would be to try StreamSets Data Collectoran open source Apache licensed tool which also has a drag and drop UI. It includes built in connectors for Kafka and a variety of data formats.

另一个(无代码)选项是尝试StreamSets Data Collector一个开源的 Apache 许可工具,它也有一个拖放 UI。它包括用于 Kafka 和各种数据格式的内置连接器。

*full disclosure I'm a committer on this project.

*完全披露我是这个项目的提交者。

回答by pratik dhamanekar

Thanks Guys,

多谢你们,

I am able to achieve it. Once the data is received at the consumer side, then it's just a common java code you have to write.

我能够实现它。一旦消费者端收到数据,那么它只是您必须编写的通用Java代码。

Below is the line in ode that prints the message to the console.

下面是 ode 中将消息打印到控制台的行。

System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));

You can store all the message to the String and print all at a time to the file.

您可以将所有消息存储到 String 并一次打印到文件中。

System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));
completMessage += new String(bytes, "UTF-8")+"\n";

new String(bytes, "UTF-8")+"\n";contains actual message.

new String(bytes, "UTF-8")+"\n";包含实际消息。

At last print all messages to file.

最后将所有消息打印到文件中。

writeDataToFile(completMessage);

writeDataToFilecontains simple java code to write a string to file.

writeDataToFile包含将字符串写入文件的简单 java 代码。

Thank you.

谢谢你。

回答by venkata

It is possible. Below is the working code for this.

有可能的。下面是这个的工作代码。

package com.venk.prac;

import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Collections;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;

import kafka.utils.ShutdownableThread;

public class FileConsumer extends ShutdownableThread {

    private final KafkaConsumer<Integer, String> kafkaConsumer;
    private String topic;
    private String filePath;
    private BufferedWriter buffWriter;

    public FileConsumer(String topic, String filePath) {

        super("FileConsumer", false);
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                KafkaProperties.KAFKA_BROKER_SERVERS_PORTS_STRING);
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "FileConsumer");
        properties.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        properties.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        kafkaConsumer = new KafkaConsumer<Integer, String>(properties);
        this.topic = topic;
        this.filePath = filePath;

        try {
            this.buffWriter = new BufferedWriter(new FileWriter(filePath));
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    @Override
    public void doWork() {
        // TODO Auto-generated method stub
        kafkaConsumer.subscribe(Collections.singletonList(this.topic));
        ConsumerRecords<Integer, String> consumerRecords = kafkaConsumer.poll(1000);
        try {
            for (ConsumerRecord<Integer, String> record : consumerRecords) 
                buffWriter.write(record.value() + System.lineSeparator());
            buffWriter.flush();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    @Override
    public String name() {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public boolean isInterruptible() {
        return false;
    }

}