Java 从多个来源读取的 Spring 批处理作业

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/21304364/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-13 07:58:35  来源:igfitidea点击:

Spring batch Job read from multiple sources

javadatabasespringspring-batch

提问by xedo

How can I read items from multiples databases? I already know that is possible from files.
the following example works for read from multiples files

如何从多个数据库中读取项目?我已经知道这是可能的文件。
以下示例适用于从多个文件中读取

...
<job id="readMultiFileJob" xmlns="http://www.springframework.org/schema/batch">
    <step id="step1">
    <tasklet>
        <chunk reader="multiResourceReader" writer="flatFileItemWriter"
            commit-interval="1" />
    </tasklet>
    </step>
</job>
...
<bean id="multiResourceReader"
    class=" org.springframework.batch.item.file.MultiResourceItemReader">
    <property name="resources" value="file:csv/inputs/domain-*.csv" />
    <property name="delegate" ref="flatFileItemReader" />
</bean>
...


three beans like this.

三颗豆子这样。

<bean id="database2" class="org.springframework.batch.item.database.JdbcCursorItemReader">
    <property name="name" value="database2Reader" />
    <property name="dataSource" ref="dataSource2" />
    <property name="sql" value="select image from object where image like '%/images/%'" />
    <property name="rowMapper">
        <bean class="sym.batch.ImagesRowMapper2" />
    </property>
</bean>

采纳答案by Luca Basso Ricci

There isn't a ready-to-use component that perform what you ask; the only solution is to write a custom ItemReader<>that delegates to JdbcCursorItemReader(or to HibernateCursorItemReaderor to any generic ItemReaderimplementation).
You need to prepare all necessary stuff (datasource, session, real database readers) and bind all delegated readers to your custom reader.

没有现成的组件可以执行您的要求;唯一的解决方案是编写一个ItemReader<>委托给JdbcCursorItemReader(或委托给或委托HibernateCursorItemReader给任何通用ItemReader实现的自定义)。
您需要准备所有必要的东西(数据源、会话、真实数据库阅读器)并将所有委托阅读器绑定到您的自定义阅读器。

EDIT: You need to simulate a loop using recusion of ItemReader.read()and mantain reader and delegates state across job restarts.

编辑:您需要ItemReader.read()在作业重新启动时使用回避和维护读取器和委托状态来模拟循环。

class MyItemReader<T> implements ItemReader<T>, ItemStream {
  private ItemReader[] delegates;
  private int delegateIndex;
  private ItemReader<T> currentDelegate;
  private ExecutionContext stepExecutionContext;

  public void setDelegates(ItemReader[] delegates) {
    this.delegates = delegates;
  }

  @BeforeStep
  private void beforeStep(StepExecution stepExecution) {
    this.stepExecutionContext = stepExecution.getExecutionContext();
  }

  public T read() {
    T item = null;
    if(null != currentDelegate) {
      item = currentDelegate.read();
      if(null == item) {
        ((ItemStream)this.currentDelegate).close();
        this.currentDelegate = null;
      }
    }
    // Move to next delegate if previous was exhausted!
    if(null == item && this.delegateIndex< this.delegates.length) {
      this.currentDelegate = this.delegates[this.currentIndex++];
      ((ItemStream)this.currentDelegate).open(this.stepExecutionContext);
      update(this.stepExecutionContext);
      // Recurse to read() to simulate loop through delegates
      item = read();
    }
    return item;
  }

  public void open(ExecutionContext ctx) {
    // During open restore last active reader and restore its state
    if(ctx.containsKey("index")) {
      this.delegateIndex = ctx.getInt("index");
      this.currentDelegate = this.delegates[this.delegateIndex];
      ((ItemStream)this.currentDelegate ).open(ctx);
    }
  }

  public void update(ExecutionContext ctx) {
    // Update current delegate index and state
    ctx.putInt("index", this.delegateIndex);
    if(null != this.currentDelegate) {
      ((ItemStream)this.currentDelegate).update(ctx);
    }
  }

  public void close(ExecutionContext ctx) {
    if(null != this.currentDelegate) {
      ((ItemStream)this.currentDelegate).close();
  }
}


<bean id="myItemReader" class=path.to.MyItemReader>
  <property name="delegates">
    <array>
      <ref bean="itemReader1"/>
      <ref bean="itemReader2"/>
      <ref bean="itemReader3"/>
    </array>
  </property>
</bean>


EDIT2: Remember to set property name; this is NECESSARY to let MyItemReader.read() works correctly

EDIT2:记得设置属性名称;这是让 MyItemReader.read() 正常工作所必需的

<bean id="itemReader1" class="JdbcCursorItemReader">
  <property name="name" value="itemReader1" />
  <!-- Set other properties -->
</bean>

回答by nsylmz

I suggest a tricky way. If we assume that one is your mysql datasource's table is base and every row in that table corresponds other mysql datasource table's row(like a join tables which are in different datasources), you could do it in your batch job itemreader. Ex of this way;

我建议一个棘手的方法。如果我们假设您的 mysql 数据源表是基表,并且该表中的每一行都对应于其他 mysql 数据源表的行(例如位于不同数据源中的连接表),则您可以在批处理作业 itemreader 中执行此操作。以这种方式为例;

Spring DataSource Configuration;

Spring数据源配置;

<bean id="mySqlDataSource1" class="org.apache.commons.dbcp.BasicDataSource">
    <property name="driverClassName" value="${database1.driverClassName}"/>
    <property name="url" value="${database1.url}"/>
    <property name="username" value="${database1.username}"/>
    <property name="password" value="${database1.password}"/>
    <property name="validationQuery" value="${database1.validationQuery}"/>
</bean>

<bean id="mySqlDataSource2" class="org.apache.commons.dbcp.BasicDataSource">
    <property name="driverClassName" value="${database2.driverClassName}"/>
    <property name="url" value="${database2.url}"/>
    <property name="username" value="${database2.username}"/>
    <property name="password" value="${database2.password}"/>
    <property name="validationQuery" value="${database2.validationQuery}"/>
</bean>

Your batch-job.xml

你的批处理job.xml

<bean id="multiDatasorceReader" class="org.springframework.batch.item.database.JdbcCursorItemReader" scope="step">
    <property name="dataSource" ref="mySqlDataSource1" />
    <property name="rowMapper" ref="multiDatasourceRowMapper" />
    <property name="sql">
        <value>
            SELECT * FROM xyz
        </value>
    </property>
</bean>

<bean id="multiDatasourceRowMapper" class="yourpackage.MultiDatasourceRowMapper" scope="step">
    <property name="secondDataSource" ref="mySqlDataSource2" />
    <property name="secondSql">
        <value>
            SELECT * FROM abc
        </value>
    </property>
</bean>

Your RowMapper looks like;

你的 RowMapper 看起来像;

public class MultiDatasourceRowMapper implements RowMapper<String> {

    private DataSource secondDataSource;

    private String secondSql;

    public String mapRow(ResultSet rs, int arg1) throws SQLException {
        Connection conn = secondDataSource.getConnection();
        PreparedStatement prep = conn.prepareStatement(secondSql); 

        // Do Something

        return "";
    }

    public void setSecondDataSource(DataSource secondDataSource) {
        this.secondDataSource = secondDataSource;
    }

    public void setSecondSql(String secondSql) {
        this.secondSql = secondSql;
    }

}

回答by stefan.m

I suggest a simple workaround that may not be suitable to all cases, but will be useful in many:

我建议一个简单的解决方法,它可能并不适合所有情况,但在许多情况下会很有用:

Simply define:

简单定义:

  • 2 readers, one for each database
  • 2 steps
  • one job that contains both 2 steps
  • 2 个读取器,每个数据库一个
  • 2 步
  • 一项同时包含两个步骤的工作

The 2 steps are nearly identical, they reference the same processor and writer, but they have different readers. They will be called consecutively.

这两个步骤几乎相同,它们引用相同的处理器和编写器,但它们具有不同的读取器。它们将被连续调用。

Whether this setup works will depend on the processor and writer (whether they still work correctly when called in different steps). In my case, it was sufficient to set appendAllowed=trueto the writer, such that both steps can write to the same file.

此设置是否有效取决于处理器和编写器(在不同步骤中调用时它们是否仍然正常工作)。在我的情况下,设置appendAllowed=true为 writer就足够了,这样两个步骤都可以写入同一个文件。