Spring webflux 和从数据库读取

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/42299455/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-09-08 01:21:07  来源:igfitidea点击:

Spring webflux and reading from database

springreactive-programming

提问by Lukasz

Spring 5 introduces the reactive programming style for rest APIs with webflux. I'm fairly new to it myself and was wondering wether wrapping synchronous calls to a database into Fluxor Monomakes sense preformence-wise? If yes, is this the way to do it:

Spring 5 为带有webflux 的rest API 引入了反应式编程风格。我自己对它还很陌生,想知道是将数据库的同步调用包装到Flux还是Mono在性能方面有意义?如果是,这是这样做的方法:

@RestController
public class HomeController {

    private MeasurementRepository repository;

    public HomeController(MeasurementRepository repository){
        this.repository = repository;
    }

    @GetMapping(value = "/v1/measurements")
    public Flux<Measurement> getMeasurements() {
        return Flux.fromIterable(repository.findByFromDateGreaterThanEqual(new Date(1486980000L)));
    }

}

Is there something like an asynchronous CrudRepository? I couldn't find it.

有没有像异步 CrudRepository 这样的东西?我找不到。

回答by Grygoriy Gonchar

One option would be to use alternative SQL clients that are fully non-blocking. Some examples include: https://github.com/mauricio/postgresql-asyncor https://github.com/finagle/roc. Of course, none of these drivers is officially supported by database vendors yet. Also, functionality is way much less attractive comparing to mature JDBC-based abstractions such as Hibernate or jOOQ.

一种选择是使用完全非阻塞的替代 SQL 客户端。一些示例包括:https: //github.com/mauricio/postgresql-asynchttps://github.com/finagle/roc。当然,这些驱动程序都没有得到数据库供应商的正式支持。此外,与成熟的基于 JDBC 的抽象(如 Hibernate 或 jOOQ)相比,功能的吸引力要小得多。

The alternative idea came to me from Scala world. The idea is to dispatch blocking calls into isolated ThreadPool not to mix blocking and non-blocking calls together. This will allow us to control the overall number of threads and will let the CPU serve non-blocking tasks in the main execution context with some potential optimizations. Assuming that we have JDBC based implementation such as Spring Data JPA which is indeed blocking, we can make it's execution asynchronous and dispatch on the dedicated thread pool.

另一种想法来自 Scala 世界。这个想法是将阻塞调用分派到隔离的 ThreadPool 中,而不是将阻塞调用和非阻塞调用混合在一起。这将允许我们控制线程的总数,并让 CPU 在主执行上下文中为非阻塞任务提供一些潜在的优化。假设我们有基于 JDBC 的实现,例如确实是阻塞的 Spring Data JPA,我们可以使其执行异步并在专用线程池上分派。

@RestController
public class HomeController {

    private final MeasurementRepository repository;
    private final Scheduler scheduler;

    public HomeController(MeasurementRepository repository, @Qualifier("jdbcScheduler") Scheduler scheduler) {
        this.repository = repository;
        this.scheduler = scheduler;
    }

    @GetMapping(value = "/v1/measurements")
    public Flux<Measurement> getMeasurements() {
        return Mono.fromCallable(() -> repository.findByFromDateGreaterThanEqual(new Date(1486980000L))).publishOn(scheduler);
    }

}

Our Scheduler for JDBC should be configured by using dedicated Thread Pool with size count equal to the number of connections.

我们的 JDBC 调度程序应该使用大小计数等于连接数的专用线程池进行配置。

@Configuration
public class SchedulerConfiguration {
    private final Integer connectionPoolSize;

    public SchedulerConfiguration(@Value("${spring.datasource.maximum-pool-size}") Integer connectionPoolSize) {
        this.connectionPoolSize = connectionPoolSize;
    }

    @Bean
    public Scheduler jdbcScheduler() {
        return Schedulers.fromExecutor(Executors.newFixedThreadPool(connectionPoolSize));
    }

}

However, there are difficulties with this approach. The main one is transaction management. In JDBC, transactions are possible only within a single java.sql.Connection. To make several operations in one transaction, they have to share a connection. If we want to make some calculations in between them, we have to keep the connection. This is not very effective, as we keep a limited number of connections idle while doing calculations in between.

但是,这种方法存在困难。主要是事务管理。在 JDBC 中,事务只能在单个 java.sql.Connection 中进行。要在一个事务中进行多个操作,它们必须共享一个连接。如果我们想在它们之间进行一些计算,我们必须保持连接。这不是很有效,因为我们在两者之间进行计算时保持有限数量的连接空闲。

This idea of an asynchronous JDBC wrapper is not new and is already implemented in Scala library Slick 3. Finally, non-blocking JDBC may come along on the Java roadmap. As it was announced at JavaOne in September 2016, and it is possible that we will see it in Java 10.

这种异步 JDBC 包装器的想法并不新鲜,并且已经在 Scala 库 Slick 3 中实现。最后,非阻塞 JDBC 可能会出现在 Java 路线图上。正如 2016 年 9 月在 JavaOne 上宣布的那样,我们可能会在 Java 10 中看到它。

回答by Dmytro Boichenko

Based on this blogyou should rewrite your snippet in following way

基于此博客,您应该按以下方式重写您的代码段

@GetMapping(value = "/v1/measurements")
public Flux<Measurement> getMeasurements() {
    return Flux.defer(() -> Flux.fromIterable(repository.findByFromDateGreaterThanEqual(new Date(1486980000L))))
           .subscribeOn(Schedulers.elastic());
}

回答by yousafsajjad

Spring data support reactive repository interface for Mongo and Cassandra.

Spring 数据支持 Mongo 和 Cassandra 的反应式存储库接口。

Spring data MongoDb Reactive Interface

Spring 数据 MongoDb 响应式接口

Spring Data MongoDB provides reactive repository support with Project Reactor and RxJava 1 reactive types. The reactive API supports reactive type conversion between reactive types.

Spring Data MongoDB 通过 Project Reactor 和 RxJava 1 反应类型提供反应存储库支持。反应式 API 支持反应式类型之间的反应式类型转换。

public interface ReactivePersonRepository extends ReactiveCrudRepository<Person, String> {

    Flux<Person> findByLastname(String lastname);

    @Query("{ 'firstname': ?0, 'lastname': ?1}")
    Mono<Person> findByFirstnameAndLastname(String firstname, String lastname);

    // Accept parameter inside a reactive type for deferred execution
    Flux<Person> findByLastname(Mono<String> lastname);

    Mono<Person> findByFirstnameAndLastname(Mono<String> firstname, String lastname);

    @InfiniteStream // Use a tailable cursor
    Flux<Person> findWithTailableCursorBy();

}

public interface RxJava1PersonRepository extends RxJava1CrudRepository<Person, String> {

    Observable<Person> findByLastname(String lastname);

    @Query("{ 'firstname': ?0, 'lastname': ?1}")
    Single<Person> findByFirstnameAndLastname(String firstname, String lastname);

    // Accept parameter inside a reactive type for deferred execution
    Observable<Person> findByLastname(Single<String> lastname);

    Single<Person> findByFirstnameAndLastname(Single<String> firstname, String lastname);

    @InfiniteStream // Use a tailable cursor
    Observable<Person> findWithTailableCursorBy();
}

回答by kkd927

Obtaining a Flux or a Mono doesn't necessarily mean it will run in a dedicated Thread. Instead, most operators continue working in the Thread on which the previous operator executed. Unless specified, the topmost operator (the source) itself runs on the Thread in which the subscribe() call was made.

获得 Flux 或 Mono 并不一定意味着它会在专用线程中运行。相反,大多数运算符继续在前一个运算符执行的线程中工作。除非指定,否则最顶层的运算符(源)本身运行在进行 subscribe() 调用的线程上。

If you have blocking persistence APIs (JPA, JDBC) or networking APIs to use, Spring MVC is the best choice for common architectures at least. It is technically feasible with both Reactor and RxJava to perform blocking calls on a separate thread but you would not be making the most of a non-blocking web stack.

如果您要使用阻塞持久性 API(JPA、JDBC)或网络 API,那么 Spring MVC 至少是通用架构的最佳选择。Reactor 和 RxJava 在单独的线程上执行阻塞调用在技术上是可行的,但您不会充分利用非阻塞 Web 堆栈。

So... How do I wrap a synchronous, blocking call?

所以......我如何包装一个同步的、阻塞的调用?

Use Callableto defer execution. And you should use Schedulers.elasticbecause it creates a dedicated thread to wait for the blocking resource without tying up some other resource.

使用Callable到延迟执行。您应该使用Schedulers.elastic它,因为它创建了一个专用线程来等待阻塞资源而不占用其他资源。

  • Schedulers.immediate() : Current thread.
  • Schedulers.single() : A single, reusable thread.
  • Schedulers.newSingle() : A per-call dedicated thread.
  • Schedulers.elastic() : An elastic thread pool. It creates new worker pools as needed, and reuse idle ones. This is a good choice for I/O blocking work for instance.
  • Schedulers.parallel() : A fixed pool of workers that is tuned for parallel work.
  • Schedulers.immediate() :当前线程。
  • Schedulers.single() :单个可重用线程。
  • Schedulers.newSingle() :每次调用的专用线程。
  • Schedulers.elastic() :弹性线程池。它根据需要创建新的工作池,并重用空闲的工作池。例如,这是 I/O 阻塞工作的不错选择。
  • Schedulers.parallel() :针对并行工作进行调整的固定工作线程池。

example:

例子:

Mono.fromCallable(() -> blockingRepository.save())
        .subscribeOn(Schedulers.elastic());