java Spring Integration Inbound-Channel-Adapter 逐行读取大文件

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/27064737/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-11-02 11:09:02  来源:igfitidea点击:

Spring Integration Inbound-Channel-Adapter to read large files line-by-line

javaspringspring-batchspring-integration

提问by Tony Falabella

I'm currently using Spring Integration 4.1.0 with Spring 4.1.2. I have a requirement to be able to read a file line-by-line and use each line read as a message. Basically I want to allow "replay" for one of our message sources but messages are not saved in individual files but rather in a single file. I have no transaction requirements for this use-case. My requirements are similar to this posting except on a file residing on the same server as the one that the JVM is running on: spring integration - read a remote file line by line

我目前正在使用 Spring Integration 4.1.0 和 Spring 4.1.2。我需要能够逐行读取文件并将读取的每一行用作消息。基本上,我想允许对我们的消息源之一进行“重播”,但消息并未保存在单个文件中,而是保存在单个文件中。我对这个用例没有交易要求。我的要求与这篇文章类似,但文件与运行 JVM 的服务器位于同一服务器上:spring 集成 - 逐行读取远程文件

As I see it I have the following options:

正如我所见,我有以下选择:

1. Use int-file:inbound-channel-adapterto read the file then "split" that file so that 1 message now becomes multiple messages. Sample config file:

1. 使用int-file:inbound-channel-adapter读取文件,然后“拆分”该文件,以便 1 条消息现在变成多条消息。示例配置文件:

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-jms="http://www.springframework.org/schema/integration/jms" xmlns:int-file="http://www.springframework.org/schema/integration/file" xmlns:task="http://www.springframework.org/schema/task"
        xsi:schemaLocation="http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd
            http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
            http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file.xsd
            http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
            http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
            http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">

        <int-file:inbound-channel-adapter id="filereader" directory="/tmp" filename-pattern="myfile.txt" channel="channel1"/>
        <int-file:file-to-string-transformer input-channel="channel1" output-channel="channel2"/>
        <int:channel id="channel1"/>
        <int:splitter input-channel="channel2" output-channel="nullChannel"/>
        <int:channel id="channel2"/>
    </beans>

The problem is that the file is very large and when using the above technique the entire file is first read into memory and is then split and the JVM runs out of heap space. Really the steps required are: read a line and convert line to message, send message, remove message from memory, repeat.

问题是文件非常大,当使用上述技术时,整个文件首先被读入内存,然后被拆分,JVM 用完了堆空间。真正需要的步骤是:读取一行并将行转换为消息,发送消息,从内存中删除消息,重复。

  1. Use int-file:tail-inbound-channel-adapterwith end="false"(which basically indicates to read from the start of the file). Start and stop this adapter as needed for each file (changing the filename before each start). Sample config file:

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-jms="http://www.springframework.org/schema/integration/jms" xmlns:int-file="http://www.springframework.org/schema/integration/file" xmlns:task="http://www.springframework.org/schema/task"
        xsi:schemaLocation="http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd
            http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
            http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file.xsd
            http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
            http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
            http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">
    
        <int-file:tail-inbound-channel-adapter id="apache"
            channel="exchangeSpringQueueChannel"
            task-executor="exchangeFileReplayTaskExecutor"
            file="C:\p2-test.txt"
            delay="1"
            end="false"
            reopen="true"
            file-delay="10000" />
    
        <int:channel id="exchangeSpringQueueChannel" />
        <task:executor id="exchangeFileReplayTaskExecutor" pool-size="1" />
    </beans>
    
  2. Have Spring Integration call into Spring Batch and use an ItemReaderto process the file. Certainly allows more fine-grained controls over the whole process but a fair amount of work to setup what with the job repository and such (and I don't care about the job history so I'd either tell the job to not log status and/or or use the in-memory MapJobRepository).

  1. 使用int-file:tail-inbound-channel-adapterwith end="false"(基本上表示从文件的开头读取)。根据每个文件的需要启动和停止此适配器(在每次启动之前更改文件名)。示例配置文件:

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-jms="http://www.springframework.org/schema/integration/jms" xmlns:int-file="http://www.springframework.org/schema/integration/file" xmlns:task="http://www.springframework.org/schema/task"
        xsi:schemaLocation="http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd
            http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
            http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file.xsd
            http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
            http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
            http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">
    
        <int-file:tail-inbound-channel-adapter id="apache"
            channel="exchangeSpringQueueChannel"
            task-executor="exchangeFileReplayTaskExecutor"
            file="C:\p2-test.txt"
            delay="1"
            end="false"
            reopen="true"
            file-delay="10000" />
    
        <int:channel id="exchangeSpringQueueChannel" />
        <task:executor id="exchangeFileReplayTaskExecutor" pool-size="1" />
    </beans>
    
  2. 让 Spring Integration 调用 Spring Batch 并使用 anItemReader来处理文件。当然允许对整个过程进行更细粒度的控制,但需要做大量的工作来设置作业存储库等内容(而且我不关心作业历史记录,所以我要么告诉作业不要记录状态和/ 或或使用内存中MapJobRepository)。

4. Create my own FileLineByLineInboundChannelAdapterby extending MessageProducerSupport. Much of the code can be borrowed from ApacheCommonsFileTailingMessageProducer(also see http://forum.spring.io/forum/spring-projects/integration/119897-custom-upd-inbound-channel-adapter). Below is a sample but needs some work to put the reading into it's own Threadso that I honor the stop()command while I read line-by-line.

4.FileLineByLineInboundChannelAdapter通过扩展创建我自己的MessageProducerSupport。大部分代码都可以借用ApacheCommonsFileTailingMessageProducer(另请参阅http://forum.spring.io/forum/spring-projects/integration/119897-custom-upd-inbound-channel-adapter)。下面是一个示例,但需要做一些工作才能将读数放入它自己的内容中,Thread以便我在stop()逐行阅读时遵守命令。

    package com.xxx.exchgateway.common.util.springintegration;

    import java.io.BufferedReader;
    import java.io.File;
    import java.io.FileInputStream;
    import java.io.FileNotFoundException;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import org.apache.commons.io.IOUtils;
    import org.springframework.core.task.SimpleAsyncTaskExecutor;
    import org.springframework.core.task.TaskExecutor;
    import org.springframework.integration.core.MessageSource;
    import org.springframework.integration.endpoint.MessageProducerSupport;
    import org.springframework.integration.file.FileHeaders;
    import org.springframework.messaging.Message;
    import org.springframework.util.Assert;

    /**
     * A lot of the logic for this class came from {@link #ApacheCommonsFileTailingMessageProducer}.
     * See {@link http://forum.spring.io/forum/spring-projects/integration/119897-custom-upd-inbound-channel-adapter}
     */
    public class FileLineByLineInboundChannelAdapter extends MessageProducerSupport implements MessageSource<String> {
        private volatile File file;

        /**
         * The name of the file you wish to tail.
         * @param file The absolute path of the file.
         */
        public void setFile(File file) {
            Assert.notNull("'file' cannot be null");
            this.file = file;
        }

        protected File getFile() {
            if (this.file == null) {
                throw new IllegalStateException("No 'file' has been provided");
            }
            return this.file;
        }

        @Override
        public String getComponentType() {
            return "file:line-by-line-inbound-channel-adapter";
        }

        private void readFile() {
            FileInputStream fstream;
            try {
                fstream = new FileInputStream(getFile());

                BufferedReader br = new BufferedReader(new InputStreamReader(fstream));

                String strLine;

                // Read File Line By Line, make sure we honor if someone manually sets the isRunning=false (via clicking the stop() method in JMX)
                while ((strLine = br.readLine()) != null && isRunning()) {
                    send(strLine);
                }

                //Close the input stream
                IOUtils.closeQuietly(br);
                IOUtils.closeQuietly(fstream);
            } catch (FileNotFoundException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }

        @Override
        protected void doStart() {
            super.doStart();

            // TODO this needs to be moved into it's own thread since isRunning() will return "false" until this method has completed
            // and we want to honor the stop() command while we read line-by-line
            readFile();
        }

        protected void send(String line) {
            Message<?> message = this.getMessageBuilderFactory().withPayload(line).setHeader(FileHeaders.FILENAME, this.file.getAbsolutePath()).build();
            super.sendMessage(message);
        }

        @Override
        public Message<String> receive() {
            // TODO Auto-generated method stub
            return null;
        }
    }

It doesn't seem to me that my use-case is outside the realm of typical things people might like to do so I'm surprised that I can't find a solution to it out-of-the-box. I've searched quite a bit however and looked at a lot of the examples and unfortunately have yet to find something that suites my needs.

在我看来,我的用例并不超出人们可能喜欢做的典型事情的范围,所以我很惊讶我找不到开箱即用的解决方案。然而,我已经搜索了很多并查看了很多示例,不幸的是还没有找到适合我需要的东西。

I'm assuming that perhaps I've missed something obvious that the framework already offers (though perhaps this falls into the blurry-line between Spring Integraton and Spring Batch). Can someone let me know if I'm totally off-base with my ideas or if there's a simple solution that I've missed, or offer alternative suggestions?

我假设我可能错过了框架已经提供的一些明显的东西(尽管这可能属于 Spring Integraton 和 Spring Batch 之间的模糊界限)。有人可以让我知道我的想法是否完全偏离基础,或者我是否遗漏了一个简单的解决方案,或者提供替代建议?

采纳答案by Sergey Shcherbakov

Spring Integration 4.x has a nice new feature of using Iterator's as messages:

Spring Integration 4.x 有一个很好的新特性,即使用迭代器作为消息:

Spring Integration Reference

Spring 集成参考

Starting with version 4.1, the AbstractMessageSplitter supports the Iterator type for the value to split.

从 4.1 版开始, AbstractMessageSplitter 支持要拆分的值的 Iterator 类型。

This allows to send Iterator as messages not reading the whole file into the memory.

这允许将 Iterator 作为消息发送,而不是将整个文件读入内存。

Here isa simple example of a Spring Context splitting CSV files into one message per line:

这是一个 Spring Context 将 CSV 文件拆分为每行一条消息的简单示例:

<int-file:inbound-channel-adapter 
        directory="${inputFileDirectory:/tmp}"
        channel="inputFiles"/>

<int:channel id="inputFiles">
    <int:dispatcher task-executor="executor"/>
</int:channel>

<int:splitter 
    input-channel="inputFiles" 
    output-channel="output">
    <bean 
        class="FileSplitter" 
        p:commentPrefix="${commentPrefix:#}" />
</int:splitter>

<task:executor 
    id="executor" 
    pool-size="${poolSize:8}" 
    queue-capacity="${aueueCapacity:0}" 
    rejection-policy="CALLER_RUNS" />

<int:channel id="output"/>

And this is the splitter implementation:

这是拆分器的实现

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.Iterator;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.integration.splitter.AbstractMessageSplitter;
import org.springframework.integration.transformer.MessageTransformationException;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;

public class FileSplitter extends AbstractMessageSplitter {
    private static final Logger log = LoggerFactory.getLogger(FileSplitter.class);

    private String commentPrefix = "#";

    public Object splitMessage(Message<?> message) {
        if(log.isDebugEnabled()) {
            log.debug(message.toString());
        }
        try {

            Object payload = message.getPayload();
            Assert.isInstanceOf(File.class, payload, "Expected java.io.File in the message payload"); 

            return new BufferedReaderFileIterator((File) payload);
        } 
        catch (IOException e) {
            String msg = "Unable to transform file: " + e.getMessage();
            log.error(msg);
            throw new MessageTransformationException(msg, e);
        }
    }

    public void setCommentPrefix(String commentPrefix) {
        this.commentPrefix = commentPrefix;
    }

    public class BufferedReaderFileIterator implements Iterator<String> {

        private File file;
        private BufferedReader bufferedReader;
        private String line;

        public BufferedReaderFileIterator(File file) throws IOException {
            this.file = file;
            this.bufferedReader = new BufferedReader(new FileReader(file));
            readNextLine();
        }

        @Override
        public boolean hasNext() {
            return line != null;
        }

        @Override
        public String next() {
            try {
                String res = this.line;
                readNextLine();
                return res;
            } 
            catch (IOException e) {
                log.error("Error reading file", e);
                throw new RuntimeException(e);
            }   
        }

        void readNextLine() throws IOException {
            do {
                line = bufferedReader.readLine();
            }
            while(line != null && line.trim().startsWith(commentPrefix));

            if(log.isTraceEnabled()) {
                log.trace("Read next line: {}", line);
            }

            if(line == null) {
                close();
            }
        }

        void close() throws IOException {
            bufferedReader.close();
            file.delete();
        }

        @Override
        public void remove() {
            throw new UnsupportedOperationException();
        }

    }

}

Please note the Iterator object being returned from the splitMessage() handler method.

请注意从 splitMessage() 处理程序方法返回的 Iterator 对象。

回答by Usman Yaqoob

I am also having this , I am also copying the files to another folder and reading data from file also

我也有这个,我也在将文件复制到另一个文件夹并从文件中读取数据

fileCopyApplicationContext.xml

文件CopyApplicationContext.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
    xmlns:file="http://www.springframework.org/schema/integration/file"
    xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
            http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/integration
            http://www.springframework.org/schema/integration/spring-integration.xsd
            http://www.springframework.org/schema/integration/file
            http://www.springframework.org/schema/integration/file/spring-integration-file.xsd
            http://www.springframework.org/schema/context 
            http://www.springframework.org/schema/context/spring-context.xsd">

    <context:property-placeholder />

    <file:inbound-channel-adapter id="filesIn"
        directory="E:/usmandata/logs/input/" filter="onlyPropertyFiles"
        auto-startup="true">
        <int:poller id="poller" fixed-delay="500" />
    </file:inbound-channel-adapter>



    <int:service-activator input-channel="filesIn"
        output-channel="filesOut" ref="handler" />

    <file:outbound-channel-adapter id="filesOut"
        directory="E:/usmandata/logs/output/" />




    <bean id="handler" class="com.javarticles.spring.integration.file.FileHandler" />
    <bean id="onlyPropertyFiles"
        class="org.springframework.integration.file.config.FileListFilterFactoryBean"
        p:filenamePattern="*.log" />
</beans>

FileHandler.java

文件处理程序

package com.javarticles.spring.integration.file;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;

public class FileHandler {
    public File handleFile(File input) throws IOException {
       // System.out.println("Copying file: " + input.getAbsolutePath());


        RandomAccessFile file = new RandomAccessFile(input,"r");

        FileChannel channel = file.getChannel();

        //System.out.println("File size is: " + channel.size());

        ByteBuffer buffer = ByteBuffer.allocate((int) channel.size());

        channel.read(buffer);

        buffer.flip();//Restore buffer to position 0 to read it

        System.out.println("Reading content and printing ... ");

        for (int i = 0; i < channel.size(); i++) {
            System.out.print((char) buffer.get());
        }

        channel.close();
        file.close();
        return input;
    }
}

SpringIntegrationFileCopyExample.java

SpringIntegrationFileCopyExample.java

package com.javarticles.spring.integration.file;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Properties;

import org.springframework.context.support.ClassPathXmlApplicationContext;

public class SpringIntegrationFileCopyExample {

    public static void main(String[] args) throws InterruptedException, IOException {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
                "fileCopyApplicationContext.xml");

    }

}