spring java - 如何为spring批处理数据和业务数据配置单独的数据源?我应该这样做吗?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/25256487/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-09-07 23:58:51  来源:igfitidea点击:

How to java-configure separate datasources for spring batch data and business data? Should I even do it?

springspring-batchspring-bootspring-java-config

提问by amacoder

My main job does only read operations and the other one does some writing but on MyISAM enginewhich ignores transactions, so I wouldn't require necessarily transaction support. How can I configure Spring Batchto have its own datasource for the JobRepository, separate from the one holding the business data? The initial one datasource-configurations is done like the following:

我的主要工作只做读操作,另一个做一些写操作,但MyISAM engine忽略事务,所以我不需要一定的事务支持。我如何配置Spring Batch为 拥有自己的数据源JobRepository,与保存业务数据的数据源分开?最初的一个数据源配置是这样完成的:

@Configuration
public class StandaloneInfrastructureConfiguration {

    @Autowired
    Environment env;

    @Bean
    public LocalContainerEntityManagerFactoryBean entityManagerFactory() {
      LocalContainerEntityManagerFactoryBean em = new LocalContainerEntityManagerFactoryBean();
      em.setDataSource(dataSource());
      em.setPackagesToScan(new String[] { "org.podcastpedia.batch.*" });

      JpaVendorAdapter vendorAdapter = new HibernateJpaVendorAdapter();
      em.setJpaVendorAdapter(vendorAdapter);
      em.setJpaProperties(additionalJpaProperties());

      return em;
    }

    Properties additionalJpaProperties() {
          Properties properties = new Properties();
          properties.setProperty("hibernate.hbm2ddl.auto", "none");
          properties.setProperty("hibernate.dialect", "org.hibernate.dialect.MySQL5Dialect");
          properties.setProperty("hibernate.show_sql", "true");

          return properties;
    }

    @Bean
    public DataSource dataSource(){

       return DataSourceBuilder.create()
                .url(env.getProperty("db.url"))
                .driverClassName(env.getProperty("db.driver"))
                .username(env.getProperty("db.username"))
                .password(env.getProperty("db.password"))
                .build();          
    }

    @Bean
    public PlatformTransactionManager transactionManager(EntityManagerFactory emf){
      JpaTransactionManager transactionManager = new JpaTransactionManager();
      transactionManager.setEntityManagerFactory(emf);

      return transactionManager;
    }
}

and then it is imported in the Job's configuration class where the @EnableBatchProcessingannotation automagically makes use of it. My initial thought was to try to set the configuration class extend the DefaultBatchConfigurer, but then I get a

然后它被导入到Job的配置类中,@EnableBatchProcessing注释自动使用它。我最初的想法是尝试设置配置类扩展DefaultBatchConfigurer,但后来我得到了

BeanCurrentlyInCreationException ( org.springframework.beans.factory.BeanCurrentlyInCreationException: Error creating bean with name jobBuilders: Requested bean is currently in creation: Is there an unresolvable circular reference?):

BeanCurrentlyInCreationException(org.springframework.beans.factory.BeanCurrentlyInCreationException:创建名为 jobBuilders 的 bean 时出错:当前正在创建请求的 bean:是否存在无法解析的循环引用?):

@Configuration
@EnableBatchProcessing
@Import({StandaloneInfrastructureConfiguration.class, NotifySubscribersServicesConfiguration.class})
public class NotifySubscribersJobConfiguration extends DefaultBatchConfigurer {

    @Autowired
    private JobBuilderFactory jobBuilders;

    @Autowired
    private StepBuilderFactory stepBuilders;

    @Autowired
    private DataSource dataSource;

    @Autowired
    Environment env;

    @Override
    @Autowired
    public void setDataSource(javax.sql.DataSource dataSource) {
        super.setDataSource(batchDataSource());
    }

    private DataSource batchDataSource(){          
       return DataSourceBuilder.create()
                .url(env.getProperty("batchdb.url"))
                .driverClassName(env.getProperty("batchdb.driver"))
                .username(env.getProperty("batchdb.username"))
                .password(env.getProperty("batchdb.password"))
                .build();          
    } 

    @Bean
    public ItemReader<User> notifySubscribersReader(){

        JdbcCursorItemReader<User> reader = new JdbcCursorItemReader<User>();
        String sql = "select * from users where is_email_subscriber is not null";

        reader.setSql(sql);
        reader.setDataSource(dataSource);
        reader.setRowMapper(rowMapper());       

        return reader;
    }
........
}   

Any thoughts are more than welcomed. The project is available on GitHub - https://github.com/podcastpedia/podcastpedia-batch

任何想法都非常受欢迎。该项目可在 GitHub 上找到 - https://github.com/podcastpedia/podcastpedia-batch

Thanks a bunch.

谢谢一堆。

回答by Frozen

Ok, this is strange but it works. Moving the datasources to it's own configuration class works just fine and one is able to autowire.

好吧,这很奇怪,但它有效。将数据源移动到它自己的配置类工作得很好,并且能够自动装配。

The example is a multi-datasource version of Spring Batch Service Example:

该示例是Spring Batch Service Example的多数据源版本:

DataSourceConfiguration:

数据源配置

public class DataSourceConfiguration {

    @Value("classpath:schema-mysql.sql")
    private Resource schemaScript;

    @Bean
    @Primary
    public DataSource hsqldbDataSource() throws SQLException {
        final SimpleDriverDataSource dataSource = new SimpleDriverDataSource();
        dataSource.setDriver(new org.hsqldb.jdbcDriver());
        dataSource.setUrl("jdbc:hsqldb:mem:mydb");
        dataSource.setUsername("sa");
        dataSource.setPassword("");
        return dataSource;
    }

    @Bean
    public JdbcTemplate jdbcTemplate(final DataSource dataSource) {
        return new JdbcTemplate(dataSource);
    }

    @Bean
    public DataSource mysqlDataSource() throws SQLException {
        final SimpleDriverDataSource dataSource = new SimpleDriverDataSource();
        dataSource.setDriver(new com.mysql.jdbc.Driver());
        dataSource.setUrl("jdbc:mysql://localhost/spring_batch_example");
        dataSource.setUsername("test");
        dataSource.setPassword("test");
        DatabasePopulatorUtils.execute(databasePopulator(), dataSource);
        return dataSource;
    }

    @Bean
    public JdbcTemplate mysqlJdbcTemplate(@Qualifier("mysqlDataSource") final DataSource dataSource) {
        return new JdbcTemplate(dataSource);
    }

    private DatabasePopulator databasePopulator() {
        final ResourceDatabasePopulator populator = new ResourceDatabasePopulator();
        populator.addScript(schemaScript);
        return populator;
    }
}

BatchConfiguration:

批量配置

@Configuration
@EnableBatchProcessing
@Import({ DataSourceConfiguration.class, MBeanExporterConfig.class })
public class BatchConfiguration {

    @Autowired
    private JobBuilderFactory jobs;

    @Autowired
    private StepBuilderFactory steps;

    @Bean
    public ItemReader<Person> reader() {
        final FlatFileItemReader<Person> reader = new FlatFileItemReader<Person>();
        reader.setResource(new ClassPathResource("sample-data.csv"));
        reader.setLineMapper(new DefaultLineMapper<Person>() {
            {
                setLineTokenizer(new DelimitedLineTokenizer() {
                    {
                        setNames(new String[] { "firstName", "lastName" });
                    }
                });
                setFieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {
                    {
                        setTargetType(Person.class);
                    }
                });
            }
        });
        return reader;
    }

    @Bean
    public ItemProcessor<Person, Person> processor() {
        return new PersonItemProcessor();
    }

    @Bean
    public ItemWriter<Person> writer(@Qualifier("mysqlDataSource") final DataSource dataSource) {
        final JdbcBatchItemWriter<Person> writer = new JdbcBatchItemWriter<Person>();
        writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Person>());
        writer.setSql("INSERT INTO people (first_name, last_name) VALUES (:firstName, :lastName)");
        writer.setDataSource(dataSource);
        return writer;
    }

    @Bean
    public Job importUserJob(final Step s1) {
        return jobs.get("importUserJob").incrementer(new RunIdIncrementer()).flow(s1).end().build();
    }

    @Bean
    public Step step1(final ItemReader<Person> reader,
            final ItemWriter<Person> writer, final ItemProcessor<Person, Person> processor) {
        return steps.get("step1")
                .<Person, Person> chunk(1)
                .reader(reader)
                .processor(processor)
                .writer(writer)
                .build();
    }
}

回答by Jared Knipp

I have my data sources in a separate configuration class. In the batch configuration, we extend DefaultBatchConfigurer and override the setDataSourcemethod, passing in the specific database to use with Spring Batch with a @Qualifier. I was unable to get this to work using the constructor version, but the setter method worked for me.

我的数据源位于单独的配置类中。在批处理配置中,我们扩展了 DefaultBatchConfigurer 并覆盖了setDataSource方法,通过@Qualifier 传入要与 Spring Batch 一起使用的特定数据库。我无法使用构造函数版本使其工作,但 setter 方法对我有用。

My Reader, Processor, and Writer's are in their own self contained classes, along with the steps.

我的 Reader、Processor 和 Writer's 在它们自己的自包含类中,以及步骤。

This is using Spring Boot 1.1.8 & Spring Batch 3.0.1. Note:We had a different setup for a project using Spring Boot 1.1.5 that did not work the same on the newer version.

这是使用 Spring Boot 1.1.8 和 Spring Batch 3.0.1。 注意:我们对使用 Spring Boot 1.1.5 的项目进行了不同的设置,但在较新版本上的工作方式不同。

package org.sample.config.jdbc;

import javax.sql.DataSource;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.env.Environment;

import com.atomikos.jdbc.AtomikosDataSourceBean;
import com.mysql.jdbc.jdbc2.optional.MysqlXADataSource;

/**
 * The Class DataSourceConfiguration.
 *
 */
@Configuration
public class DataSourceConfig {

    private final static Logger log = LoggerFactory.getLogger(DataSourceConfig.class);

    @Autowired private Environment env;

    /**
     * Siphon data source.
     *
     * @return the data source
     */
    @Bean(name = "mainDataSource")
    @Primary
    public DataSource mainDataSource() {

        final String user = this.env.getProperty("db.main.username");
        final String password = this.env.getProperty("db.main.password");
        final String url = this.env.getProperty("db.main.url");

        return this.getMysqlXADataSource(url, user, password);
    }

    /**
     * Batch data source.
     *
     * @return the data source
     */
    @Bean(name = "batchDataSource", initMethod = "init", destroyMethod = "close")
    public DataSource batchDataSource() {

        final String user = this.env.getProperty("db.batch.username");
        final String password = this.env.getProperty("db.batch.password");
        final String url = this.env.getProperty("db.batch.url");

        return this.getAtomikosDataSource("metaDataSource", this.getMysqlXADataSource(url, user, password));
    }

    /**
     * Gets the mysql xa data source.
     *
     * @param url the url
     * @param user the user
     * @param password the password
     * @return the mysql xa data source
     */
    private MysqlXADataSource getMysqlXADataSource(final String url, final String user, final String password) {

        final MysqlXADataSource mysql = new MysqlXADataSource();
        mysql.setUser(user);
        mysql.setPassword(password);
        mysql.setUrl(url);
        mysql.setPinGlobalTxToPhysicalConnection(true);

        return mysql;
    }

    /**
     * Gets the atomikos data source.
     *
     * @param resourceName the resource name
     * @param xaDataSource the xa data source
     * @return the atomikos data source
     */
    private AtomikosDataSourceBean getAtomikosDataSource(final String resourceName, final MysqlXADataSource xaDataSource) {

        final AtomikosDataSourceBean atomikos = new AtomikosDataSourceBean();
        atomikos.setUniqueResourceName(resourceName);
        atomikos.setXaDataSource(xaDataSource);
        atomikos.setMaxLifetime(3600);
        atomikos.setMinPoolSize(2);
        atomikos.setMaxPoolSize(10);

        return atomikos;
    }

}


package org.sample.settlement.batch;

import javax.sql.DataSource;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.DefaultBatchConfigurer;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;

/**
 * The Class BatchConfiguration.
 *
 */
@Configuration
@EnableBatchProcessing
public class BatchConfiguration extends DefaultBatchConfigurer {
    private final static Logger log = LoggerFactory.getLogger(BatchConfiguration.class);
    @Autowired private JobBuilderFactory jobs;
    @Autowired private StepBuilderFactory steps;
    @Autowired private PlatformTransactionManager transactionManager;
    @Autowired @Qualifier("processStep") private Step processStep;

    /**
     * Process payments job.
     *
     * @return the job
     */
    @Bean(name = "processJob")
    public Job processJob() {
        return this.jobs.get("processJob")
                    .incrementer(new RunIdIncrementer())
                    .start(processStep)
                    .build();
    }

    @Override
    @Autowired
    public void setDataSource(@Qualifier("batchDataSource") DataSource batchDataSource) {
        super.setDataSource(batchDataSource);
    }
}

回答by Philippe

Per https://docs.spring.io/spring-boot/docs/current/reference/htmlsingle/#howto-two-datasources:

根据https://docs.spring.io/spring-boot/docs/current/reference/htmlsingle/#howto-two-datasources

@Bean
@Primary
@ConfigurationProperties("app.datasource.first")
public DataSourceProperties firstDataSourceProperties() {
    return new DataSourceProperties();
}

@Bean
@Primary
@ConfigurationProperties("app.datasource.first")
public DataSource firstDataSource() {
    return firstDataSourceProperties().initializeDataSourceBuilder().build();
}

@Bean
@ConfigurationProperties("app.datasource.second")
public DataSourceProperties secondDataSourceProperties() {
    return new DataSourceProperties();
}

@Bean
@ConfigurationProperties("app.datasource.second")
public DataSource secondDataSource() {
    return secondDataSourceProperties().initializeDataSourceBuilder().build();
}

In the application properties, you can use regular datasource properties:

在应用程序属性中,您可以使用常规数据源属性:

app.datasource.first.type=com.zaxxer.hikari.HikariDataSource
app.datasource.first.maximum-pool-size=30

app.datasource.second.url=jdbc:mysql://localhost/test
app.datasource.second.username=dbuser
app.datasource.second.password=dbpass
app.datasource.second.max-total=30

回答by gyoder

Have you tried something like this already?

你已经尝试过这样的事情了吗?

@Bean(name="batchDataSource")
public DataSource batchDataSource(){          
       return DataSourceBuilder.create()
                .url(env.getProperty("batchdb.url"))
                .driverClassName(env.getProperty("batchdb.driver"))
                .username(env.getProperty("batchdb.username"))
                .password(env.getProperty("batchdb.password"))
                .build();          
} 

and then mark the other datasource with a @Primary, and use an @Qualifier in your batch config to specify that you want to auotwire the batchDataSource bean.

然后用@Primary 标记另一个数据源,并在批处理配置中使用@Qualifier 来指定要自动装配batchDataSource bean。

回答by Tejucb

Assuming you have 2 data sources, one for spring batch metadata such as job details[lets say CONFIGDB] and other for your business data [lets say AppDB]:

假设您有 2 个数据源,一个用于 spring 批处理元数据,例如作业详细信息[假设为 CONFIGDB],另一个用于您的业务数据 [假设为 AppDB]:

Inject CONFIGDB into jobRepository, like this:

将 CONFIGDB 注入 jobRepository,如下所示:

 <bean id="jobRepository"
    class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean">
    <property name="transactionManager" ref="transactionManager" />
    <property name="dataSource" ref="CONFIGDB" />
    <property name="databaseType" value="db2" />
    <property name="tablePrefix" value="CONFIGDB.BATCH_" />
  </bean>

Now you can inject the AppDB dartasource into your DAO's OR Writers if any like..

现在您可以将 AppDB dartasource 注入到您的 DAO 的 OR Writers 中(如果有的话)。

   <bean id="DemoItemWriter" class="com.demoItemWriter">
     <property name="dataSource" ref="AppDB" />     
   </bean>

OR

或者

you can do define a resource and Inject this AppDB with jndi lookup in the class where its needed like:

您可以定义一个资源并在需要的类中使用 jndi 查找注入此 AppDB,例如:

public class ExampleDAO {

@Resource(lookup = "java:comp/env/jdbc/AppDB")
DataSource ds;

}

}

回答by C. Asselmeyer

As suggested by Frozen in his answertwo DataSources did the trick for me. Additionally I needed to define a BatchDataSourceInitializerto properly initialize the batch DataSource as suggested in Michael Minella's answer to this related question.

正如 Frozen 在他的回答中所建议的,两个 DataSource 对我有用。此外,我需要定义一个BatchDataSourceInitializer以正确初始化批处理数据源,如 Michael Minella对此相关问题回答中所建议的。

DataSource configuration

数据源配置

@Configuration
public class DataSourceConfiguration {

    @Bean
    @Primary
    @ConfigurationProperties("domain.datasource")
    public DataSource domainDataSource() {
        return DataSourceBuilder.create().build();
    }

    @Bean("batchDataSource")
    @ConfigurationProperties("batch.datasource")
    public DataSource batchDataSource() {
        return DataSourceBuilder.create().build();
    }
}

Batch Configuration

批量配置

@Configuration
@EnableBatchProcessing
public class BatchConfiguration extends DefaultBatchConfigurer {

    @Override
    @Autowired
    public void setDataSource(@Qualifier("batchDataSource") DataSource batchDataSource) {
        super.setDataSource(batchDataSource);
    }

    @Bean
    public BatchDataSourceInitializer batchDataSourceInitializer(@Qualifier("batchDataSource") DataSource batchDataSource,
            ResourceLoader resourceLoader) {
        return new BatchDataSourceInitializer(batchDataSource, resourceLoader, new BatchProperties());
    }

application.properties:

应用程序属性:

# Sample configuraion using a H2 in-memory DB
domain.datasource.jdbcUrl=jdbc:h2:mem:domain-ds;DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE
domain.datasource.username=sa
domain.datasource.password=
domain.datasource.driver=org.h2.Driver

batch.datasource.jdbcUrl=jdbc:h2:mem:batch-ds;DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE
batch.datasource.username=sa
batch.datasource.password=
batch.datasource.driver=org.h2.Driver

回答by sakya

Add @BatchDataSourceto the batch data source if your spring boot's version is 2.2.0or later.

如果您的 spring boot 版本为2.2.0或更高版本,请将@BatchDataSource添加到批处理数据源。

Details of this annotation is as follows:

该注解的详细信息如下:

/**
 * Qualifier annotation for a DataSource to be injected into Batch auto-configuration. Can
 * be used on a secondary data source, if there is another one marked as
 * {@link Primary @Primary}.
 *
 * @author Dmytro Nosan
 * @since 2.2.0
 */
@Target({ ElementType.FIELD, ElementType.METHOD, ElementType.PARAMETER, ElementType.TYPE, ElementType.ANNOTATION_TYPE })
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Qualifier
public @interface BatchDataSource {

}

for example:

例如:

@BatchDataSource
@Bean("batchDataSource")
public DataSource batchDataSource(@Qualifier("batchDataSourceProperties") DataSourceProperties dataSourceProperties) {
        return dataSourceProperties
                .initializeDataSourceBuilder()
                .type(HikariDataSource.class)
                .build();
}