Java 如何将数据从服务器放入 Kinesis Stream

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/24343445/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-14 11:41:09  来源:igfitidea点击:

How to put data from server to Kinesis Stream

javaamazon-web-servicesrabbitmqamazon-kinesis

提问by Sam

I am new to Kinesis. Reading out the documentation i found i can create the Kinesis Stream to get data from Producer. Then using KCL will read this data from Stream to further processing. I understand how to write the KCL application by implemeting IRecordProcessor .

我是 Kinesis 的新手。阅读我发现的文档,我可以创建 Kinesis Stream 以从 Producer 获取数据。然后使用 KCL 将从 Stream 中读取这些数据以进行进一步处理。我了解如何通过实现 IRecordProcessor 来编写 KCL 应用程序。

However the very first stage as how to put data on Kinesis stream is still not clear to me. Do we have some AWS API which does need implementation to achieve this.

然而,关于如何将数据放在 Kinesis 流上的第一阶段对我来说仍然不清楚。我们是否有一些需要实现的 AWS API 来实现这一点。

Scenarios: I have an server which is contineously getting data from various sources in the folders. Each folder is containing the text file whose rows are containing the required attributes for furhter analytical work. i have to push all these data to Kinesis Stream.

场景:我有一个服务器,它不断从文件夹中的各种来源获取数据。每个文件夹都包含文本文件,其行包含进一步分析工作所需的属性。我必须将所有这些数据推送到 Kinesis Stream。

I need code something as below below class putData method wil be used to out in Kinesis stream

我需要如下代码类 putData 方法将用于在 Kinesis 流中输出

public class Put {

    AmazonKinesisClient kinesisClient;

    Put()
    {
        String accessKey = "My Access Key here" ;
        String secretKey = "My Secret Key here" ;
        AWSCredentials credentials = new BasicAWSCredentials(accessKey, secretKey);
       kinesisClient = new AmazonKinesisClient(credentials);
       kinesisClient.setEndpoint("kinesis.us-east-1.amazonaws.com", "kinesis", "us-east-1");
        System.out.println("starting the Put Application");
    }

    public void putData(String fileContent,String session) throws Exception
    {
         final String myStreamName = "ClickStream";

            PutRecordRequest putRecordRequest = new PutRecordRequest();
            putRecordRequest.setStreamName(myStreamName);
            String putData = fileContent;
            putRecordRequest.setData(ByteBuffer.wrap(putData.getBytes()));
            putRecordRequest.setPartitionKey("session"+session);
            PutRecordResult putRecordResult = kinesisClient.putRecord(putRecordRequest);
            System.out.println("Successfully putrecord, partition key : " + putRecordRequest.getPartitionKey()
                    + ", ShardID : " + putRecordResult.getShardId());
            System.out.println(fileContent);
            System.out.println("Sequence Number: "+putRecordResult.getSequenceNumber());

            System.out.println("Data has been PUT successfully");


    }
}

However reading file from the source folder from the server and then what design i should use to call putData to get the record on Kinesis stream. Do i need infinite loop and reading all files and then do this or some framework which will better do this with care of fault tolerance , single point of failure all . Any help would be greatly appreciated.

但是,从服务器的源文件夹中读取文件,然后我应该使用什么设计来调用 putData 以获取 Kinesis 流上的记录。我是否需要无限循环并读取所有文件,然后执行此操作或某些框架,以便在容错、单点故障的情况下更好地执行此操作。任何帮助将不胜感激。

Briefly: I need a better technique to put regularly generated data to Kinesis Stream the data is generated at regular interval to server. Thanks

简而言之:我需要一种更好的技术来将定期生成的数据放入 Kinesis Stream 数据是定期生成到服务器的。谢谢

采纳答案by Dan Ciborowski - MSFT

So it seems you are already using... http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisClient.html

因此,您似乎已经在使用... http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisClient.html

Specific method you want is as follows.

你要的具体方法如下。

You need a stream name, record, and stream key. http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/PutRecordResult.html

您需要流名称、记录和流密钥。 http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/PutRecordResult.html

But it seems you have all this?

但似乎你拥有这一切?

You would then need a program running always tailing your server log file and when ever there is a new line it will push this.

然后,您需要一个始终拖尾服务器日志文件的程序,并且每当有新行时,它都会推送它。

But your data will only sit their for 24 hours. You then need a worker program to consume the data and place it in some other AWS resource.

但是您的数据只会保留 24 小时。然后,您需要一个工作程序来使用数据并将其放置在其他一些 AWS 资源中。

回答by az3

If you are tailing some files, try Fluentd. http://www.fluentd.org/

如果您要拖尾一些文件,请尝试 Fluentd。http://www.fluentd.org/

Amazon Kinesis has a pretty nice plugin for that. https://github.com/awslabs/aws-fluent-plugin-kinesis

亚马逊 Kinesis 有一个非常好的插件。https://github.com/awslabs/aws-fluent-plugin-kinesis

回答by Kazuki Ohta

If you're trying to ingest log files, please try Fluentd. Fluentd can tail log files continuously, and do data buffering, encryption, compression, and retry.

如果您正在尝试摄取日志文件,请尝试 Fluentd。Fluentd 可以连续拖尾日志文件,做数据缓冲、加密、压缩和重试。

Fluentd's Kinesis plugin is developed by Amazon Web Services itself.

Fluentd 的 Kinesis 插件是由 Amazon Web Services 自己开发的。

回答by prasadvk

You can use Amazon kinesis agent to monitor on a set of files and they can stream data to kinesis.

您可以使用 Amazon kinesis 代理监控一组文件,它们可以将数据流式传输到 kinesis。

http://docs.aws.amazon.com/streams/latest/dev/writing-with-agents.html

http://docs.aws.amazon.com/streams/latest/dev/writing-with-agents.html