带有结果集的 java.util.stream

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/32209248/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-11 12:10:06  来源:igfitidea点击:

java.util.stream with ResultSet

javajdbclambdajava-streamjooq

提问by Iurii

I have few tables with big amount of data (about 100 million records). So I can't store this data in memory but I would like to stream this result setusing java.util.streamclass and pass this stream to another class. I read about Stream.ofand Stream.Builderoperators but they are buffered streams in memory. So is there any way to resolve this question? Thanks in advance.

我有几个包含大量数据的表(大约 1 亿条记录)。因此,我无法将此数据存储在内存中,但我想使用class流式传输此结果集java.util.stream并将此流传递给另一个类。我读过Stream.ofandStream.Builder运算符,但它们是内存中的缓冲流。那么有没有办法解决这个问题呢?提前致谢。

UPDATE #1

更新 #1

Okay I googled and found jooqlibrary. I'm not sure but looks like it could be applicable to my test case. To summarize I have few tables with big amount of data. I would like to stream my resultset and transfer this stream to another method. Something like this:

好吧,我用谷歌搜索并找到了jooq库。我不确定,但看起来它可能适用于我的测试用例。总而言之,我有几个包含大量数据的表格。我想流式传输我的结果集并将此流传输到另一种方法。像这样的东西:

// why return Stream<String>? Because my result set has String type
private Stream<Record> writeTableToStream(DataSource dataSource, String table) {

    Stream<Record> record = null;
    try (Connection connection = dataSource.getConnection()) {
        String sql = "select * from " + table;

        try (PreparedStatement pSt = connection.prepareStatement(sql)) {
            connection.setAutoCommit(false);
            pSt.setFetchSize(5000);
            ResultSet resultSet = pSt.executeQuery();
            //
            record = DSL.using(connection)
                    .fetch(resultSet).stream();
        }
    } catch (SQLException sqlEx) {
        logger.error(sqlEx);
    }

    return record;
}

Could please someone advise, am I on correct way? Thanks.

请有人建议,我在正确的方式吗?谢谢。

UPDATE #2

更新 #2

I made some experiment on jooqand could say now that above decision is not suitable for me. This code record = DSL.using(connection).fetch(resultSet).stream();takes too much time

我在jooq上做了一些实验,现在可以说上述决定不适合我。此代码record = DSL.using(connection).fetch(resultSet).stream();花费太多时间

回答by alfasin

I'm not aware of any well-known library that will do it for you.

我不知道有任何知名的图书馆会为你做这件事。

That said, this articleshows how to wrap the resultset with an Iterator (ResultSetIterator) and pass it as the first parameter to Spliterators.spliteratorUnknownSize()in order to create a Spliterator.

也就是说,本文展示了如何使用迭代器 (ResultSetIterator) 包装结果集并将其作为第一个参数传递给以Spliterators.spliteratorUnknownSize()创建Spliterator.

The Spliterator can then be used by StreamSupportin order to create a Stream on top of it.

然后可以使用 Spliterator 以StreamSupport在其上创建 Stream。

Their suggested implementation of ResultSetIteratorclass:

他们建议的ResultSetIterator类实现:

public class ResultSetIterator implements Iterator {

    private ResultSet rs;
    private PreparedStatement ps;
    private Connection connection;
    private String sql;

    public ResultSetIterator(Connection connection, String sql) {
        assert connection != null;
        assert sql != null;
        this.connection = connection;
        this.sql = sql;
    }

    public void init() {
        try {
            ps = connection.prepareStatement(sql);
            rs = ps.executeQuery();

        } catch (SQLException e) {
            close();
            throw new DataAccessException(e);
        }
    }

    @Override
    public boolean hasNext() {
        if (ps == null) {
            init();
        }
        try {
            boolean hasMore = rs.next();
            if (!hasMore) {
                close();
            }
            return hasMore;
        } catch (SQLException e) {
            close();
            throw new DataAccessException(e);
        }

    }

    private void close() {
        try {
            rs.close();
            try {
                ps.close();
            } catch (SQLException e) {
                //nothing we can do here
            }
        } catch (SQLException e) {
            //nothing we can do here
        }
    }

    @Override
    public Tuple next() {
        try {
            return SQL.rowAsTuple(sql, rs);
        } catch (DataAccessException e) {
            close();
            throw e;
        }
    }
}

and then:

进而:

public static Stream stream(final Connection connection, 
                                       final String sql, 
                                       final Object... parms) {
  return StreamSupport
                .stream(Spliterators.spliteratorUnknownSize(
                        new ResultSetIterator(connection, sql), 0), false);
}

回答by Holger

The first thing you have to understand is that code like

您必须了解的第一件事是这样的代码

try (Connection connection = dataSource.getConnection()) {
    …
    try (PreparedStatement pSt = connection.prepareStatement(sql)) {
        …
        return stream;
    }
}

does not work as by the time you leave the tryblocks, the resources are closed while the processing of the Streamhasn't even started.

不工作,因为当你离开try块时,资源被关闭,而处理Stream甚至还没有开始。

The resource management construct “try with resources” works for resources used within a block scope inside a method but you are creating a factory method returning a resource. Therefore you have to ensure that the closing of the returned stream will close the resources and the caller is responsible for closing the Stream.

资源管理构造“尝试使用资源”适用于方法内部块范围内使用的资源,但您正在创建返回资源的工厂方法。因此,您必须确保关闭返回的流将关闭资源并且调用者负责关闭Stream.



Further, you need a function which produces an item out of a single line from the ResultSet. Supposing, you have a method like

此外,您需要一个从ResultSet. 假设,你有一个像

Record createRecord(ResultSet rs) {
    …
}

you may create a Stream<Record>basically like

你可以创建一个Stream<Record>基本上喜欢

Stream<Record> stream = StreamSupport.stream(new Spliterators.AbstractSpliterator<Record>(
    Long.MAX_VALUE,Spliterator.ORDERED) {
        @Override
        public boolean tryAdvance(Consumer<? super Record> action) {
            if(!resultSet.next()) return false;
            action.accept(createRecord(resultSet));
            return true;
        }
    }, false);

But to do it correctly you have to incorporate the exception handling and closing of resources. You can use Stream.onCloseto register an action that will be performed when the Streamgets closed, but it has to be a Runnablewhich can not throw checked exceptions. Similarly the tryAdvancemethod is not allowed to throw checked exceptions. And since we can't simply nest try(…)blocks here, the program logic of suppression exceptions thrown in close, when there is already a pending exception, doesn't come for free.

但是要正确地做到这一点,您必须合并异常处理和资源关闭。您可以使用Stream.onClose注册将Stream在关闭时执行的操作,但它必须是Runnable不能抛出已检查异常的操作。同样,该tryAdvance方法不允许抛出已检查的异常。并且由于我们不能try(…)在这里简单地嵌套块close,当已经有待处理的异常时,抑制抛出异常的程序逻辑并不是免费的。

To help us here, we introduce a new type which can wrap closing operations which may throw checked exceptions and deliver them wrapped in an unchecked exception. By implementing AutoCloseableitself, it can utilize the try(…)construct to chain close operations safely:

为了帮助我们,我们引入了一种新类型,它可以包装可能抛出已检查异常的关闭操作,并将它们包装在未检查异常中。通过实现AutoCloseable自身,它可以利用该try(…)构造安全地链接关闭操作:

interface UncheckedCloseable extends Runnable, AutoCloseable {
    default void run() {
        try { close(); } catch(Exception ex) { throw new RuntimeException(ex); }
    }
    static UncheckedCloseable wrap(AutoCloseable c) {
        return c::close;
    }
    default UncheckedCloseable nest(AutoCloseable c) {
        return ()->{ try(UncheckedCloseable c1=this) { c.close(); } };
    }
}

With this, the entire operation becomes:

这样,整个操作就变成了:

private Stream<Record> tableAsStream(DataSource dataSource, String table)
    throws SQLException {

    UncheckedCloseable close=null;
    try {
        Connection connection = dataSource.getConnection();
        close=UncheckedCloseable.wrap(connection);
        String sql = "select * from " + table;
        PreparedStatement pSt = connection.prepareStatement(sql);
        close=close.nest(pSt);
        connection.setAutoCommit(false);
        pSt.setFetchSize(5000);
        ResultSet resultSet = pSt.executeQuery();
        close=close.nest(resultSet);
        return StreamSupport.stream(new Spliterators.AbstractSpliterator<Record>(
            Long.MAX_VALUE,Spliterator.ORDERED) {
            @Override
            public boolean tryAdvance(Consumer<? super Record> action) {
                try {
                    if(!resultSet.next()) return false;
                    action.accept(createRecord(resultSet));
                    return true;
                } catch(SQLException ex) {
                    throw new RuntimeException(ex);
                }
            }
        }, false).onClose(close);
    } catch(SQLException sqlEx) {
        if(close!=null)
            try { close.close(); } catch(Exception ex) { sqlEx.addSuppressed(ex); }
        throw sqlEx;
    }
}

This method wraps the necessary close operation for all resources, Connection, Statementand ResultSetwithin one instance of the utility class described above. If an exception happens during the initialization, the close operation is performed immediately and the exception is delivered to the caller. If the stream construction succeeds, the close operation is registered via onClose.

此方法包装必要关闭操作的所有资源,ConnectionStatementResultSet上面描述的工具类的一个实例内。如果在初始化过程中发生异常,则立即执行关闭操作并将异常传递给调用者。如果流构造成功,则关闭操作通过 注册onClose

Therefore the caller has to ensure proper closing like

因此,调用者必须确保正确关闭,例如

try(Stream<Record> s=tableAsStream(dataSource, table)) {
    // stream operation
}

Note that also the delivery of an SQLExceptionvia RuntimeExceptionhas been added to the tryAdvancemethod. Therefore you may now add throws SQLExceptionto the createRecordmethod without problems.

请注意,该方法还添加了过SQLException孔的交付。因此,您现在可以毫无问题地添加到方法中。RuntimeExceptiontryAdvancethrows SQLExceptioncreateRecord

回答by Lukas Eder

jOOQ

约克

I'm going to answer the jOOQpart of your question. As of jOOQ 3.8, there have now been quite a few additional features related to combining jOOQ with Stream. Other usages are also documented on this jOOQ page.

我将回答您问题的jOOQ部分。从 jOOQ 3.8 开始,现在有很多与将 jOOQ 与 Stream 相结合相关的附加功能。此 jOOQ 页面上还记录了其他用法

Your suggested usage:

您的建议用法:

You tried this:

你试过这个:

Stream<Record> stream = DSL.using(connection).fetch(resultSet).stream();

Indeed, this doesn't work well for large result sets because fetch(ResultSet)fetches the entire result set into memory and then calls Collection.stream()on it.

实际上,这对于大型结果集效果不佳,因为fetch(ResultSet)将整个结果集提取到内存中,然后对其进行调用Collection.stream()

Better (lazy) usage:

更好的(懒惰的)用法:

Instead, you could write this:

相反,你可以这样写:

try (Stream<Record> stream = DSL.using(connection).fetchStream(resultSet)) {
    ...
}

... which is essentially convenience for this:

...这本质上是方便的:

try (Cursor<Record> cursor = DSL.using(connection).fetchLazy(resultSet)) {
    Stream<Record> stream = cursor.stream();
    ...
}

See also DSLContext.fetchStream(ResultSet)

也可以看看 DSLContext.fetchStream(ResultSet)

Of course, you could also let jOOQ execute your SQL string, rather than wrestling with JDBC:

当然,您也可以让 jOOQ 执行您的 SQL 字符串,而不是与 JDBC 搏斗:

try (Stream<Record> stream = 
     DSL.using(dataSource)
        .resultQuery("select * from {0}", DSL.name(table)) // Prevent SQL injection
        .fetchSize(5000)
        .fetchStream()) {
    ...
}

On try-with-resources usage

关于 try-with-resources 的使用

Do note that a Streamproduced by jOOQ is "resourceful", i.e. it contains a reference to an open ResultSet(and PreparedStatement). So, if you really want to return that stream outside of your method, make sure it is closed properly!

请注意,Stream由 jOOQ 生成的 a 是“资源丰富的”,即它包含对 open ResultSet(和PreparedStatement)的引用。因此,如果您真的想在方法之外返回该流,请确保它已正确关闭!

回答by user_3380739

Here is the simplest sample by abacus-jdbc.

这是abacus-jdbc的最简单示例。

final DataSource ds = JdbcUtil.createDataSource(url, user, password);
final SQLExecutor sqlExecutor = new SQLExecutor(ds);
sqlExecutor.stream(sql, parameters).filter(...).map(...).collect(...) // lazy execution&loading and auto-close Statement/Connection

Or:

或者:

JdbcUtil.prepareQuery(ds, sql).filter(...).map(...).collect(...)  // lazy execution&loading and auto-close Statement/Connection

This is totally lazy loading and auto-closure. The records will loaded from db by fetch size(default if not specified) and the Statement and Connection will automatically closed after the result/records are collected.

这完全是延迟加载和自动关闭。记录将从 db 加载fetch size(如果未指定,则为默认值),并且在收集结果/记录后,语句和连接将自动关闭。

Disclosure: I'm the developer of AbacusUtil.

披露: 我是 AbacusUtil 的开发者。

回答by user2276550

I just did the summary to provide the real example about how to stream ResultSet and do the simple SQL query without using 3rd click here for detail

我只是做了总结,以提供有关如何流式传输 ResultSet 和执行简单 SQL 查询而不使用第 3 次单击此处了解详细信息的真实示例

Blockquote: Java 8 provided the Stream family and easy operation of it. The way of pipeline usage made the code clear and smart. However, ResultSet is still go with very legacy way to process. Per actual ResultSet usage, it is really helpful if converted as Stream.

Blockquote:Java 8 提供了 Stream 家族并且易于操作。管道的使用方式使代码清晰而智能。但是,ResultSet 仍然采用非常传统的处理方式。根据实际的 ResultSet 用法,如果转换为 Stream,那真的很有帮助。

.... StreamUtils.uncheckedConsumer is required to convert the the SQLException to runtimeException to make the Lamda clear.

.... StreamUtils.uncheckedConsumer 需要将 SQLException 转换为 runtimeException 以使 Lamda 清晰。

回答by Anatoly

Using my library it would be done like this:

使用我的库,它会是这样完成的:

attach maven dependency:

附加maven依赖:

<dependency>
    <groupId>com.github.buckelieg</groupId>
    <artifactId>db-fn</artifactId>
    <version>0.3.4</version>
</dependency>

use library in code:

在代码中使用库:

Function<Stream<I>, O> processor = stream -> //process input stream
try (DB db = new DB("jdbc:postgresql://host:port/database?user=user&password=pass")) {
    processor.apply(
        db.select("SELECT * FROM my_table t1 JOIN my_table t2 ON t1.id = t2.id")
          .fetchSize(5000)
          .execute(rs -> /*ResultSet mapper*/)
    );
}

See more here

在这里查看更多

回答by pop

Some common module called Toolsof a Ujormframework offers a simple solution using the RowIteratorclass. Example of use:

一些称为Ujorm框架工具的通用模块提供了使用该类的简单解决方案。使用示例:RowIterator

    PreparedStatement ps = dbConnection.prepareStatement("SELECT * FROM myTable");
    new RowIterator(ps).toStream().forEach((RsConsumer)(resultSet) -> {
        int value = resultSet.getInt(1);
    });

Maven dependency on the Tools library (50KB):

Maven 依赖于 Tools 库 (50KB):

    <dependency>
        <groupId>org.ujorm</groupId>
        <artifactId>ujo-tools</artifactId>
        <version>1.93</version>
    </dependency>

See jUnit testfor more information.

有关更多信息,请参阅jUnit 测试