Spring Batch:一个读者,多个处理器和作家

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/18999724/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-09-08 06:14:40  来源:igfitidea点击:

Spring Batch: One reader, multiple processors and writers

springspring-batchitemprocessoritemwriter

提问by danidemi

In Spring batch I need to pass the items read by an ItemReader to two different processors and writer. What I'm trying to achieve is that...

在 Spring 批处理中,我需要将 ItemReader 读取的项目传递给两个不同的处理器和编写器。我想要实现的是......

                        +---> ItemProcessor#1 ---> ItemWriter#1
                        |
ItemReader ---> item ---+
                        |
                        +---> ItemProcessor#2 ---> ItemWriter#2

This is needed because items written by ItemWriter#1 should be processed in a completely different way compared to the ones written by ItemWriter#2. Moreover, ItemReader reads item from a database, and the queries it executes are so computational expensive that executing the same query twice should be discarded.

这是必需的,因为与 ItemWriter#2 编写的项目相比,由 ItemWriter#1 编写的项目应该以完全不同的方式进行处理。此外,ItemReader 从数据库中读取项目,并且它执行的查询计算成本很高,因此应该丢弃两次执行相同的查询。

Any hint about how to achieve such set up ? Or, at least, a logically equivalent set up ?

关于如何实现这种设置的任何提示?或者,至少是逻辑上等效的设置?

采纳答案by Luca Basso Ricci

This solution is valid if your item should be processed by processor #1 and processor #2

如果您的项目应由处理器 #1 和处理器 #2 处理,则此解决方案有效

You have to create a processor #0 with this signature:

您必须使用此签名创建一个处理器 #0:

class Processor0<Item, CompositeResultBean>

where CompositeResultBeanis a bean defined as

CompositeResultBean一个 bean在哪里定义为

class CompositeResultBean {
  Processor1ResultBean result1;
  Processor2ResultBean result2;
}

In your Processor #0 just delegate work to processors #1 and #2 and put result in CompositeResultBean

在您的处理器 #0 中,只需将工作委托给处理器 #1 和 #2 并将结果放入 CompositeResultBean

CompositeResultBean Processor0.process(Item item) {
  final CompositeResultBean r = new CompositeResultBean();
  r.setResult1(processor1.process(item));
  r.setResult2(processor2.process(item));
  return r;
}

Your own writer is a CompositeItemWriterthat delegate to writer CompositeResultBean.result1or CompositeResultBean.result2(look at PropertyExtractingDelegatingItemWriter, maybe can help)

您自己的作家是作家的CompositeItemWriter委托人CompositeResultBean.result1CompositeResultBean.result2(查看PropertyExtractingDelegatingItemWriter,也许可以提供帮助)

回答by Juan Pablo G

I followed Luca's suggestion to use PropertyExtractingDelegatingItemWriteras writer and I was able to work with two different entities in one single step.

我遵循了 Luca 的建议,将其PropertyExtractingDelegatingItemWriter用作作家,并且我能够在一个步骤中处理两个不同的实体。

First of all what I did was to define a DTO that stores the two entities/results from the processor

首先我所做的是定义一个 DTO 来存储来自处理器的两个实体/结果

public class DatabaseEntry {
    private AccessLogEntry accessLogEntry;
    private BlockedIp blockedIp;

    public AccessLogEntry getAccessLogEntry() {
        return accessLogEntry;
    }

    public void setAccessLogEntry(AccessLogEntry accessLogEntry) {
        this.accessLogEntry = accessLogEntry;
    }

    public BlockedIp getBlockedIp() {
        return blockedIp;
    }

    public void setBlockedIp(BlockedIp blockedIp) {
        this.blockedIp = blockedIp;
    }
}

Then I passed this DTO to the writer, a PropertyExtractingDelegatingItemWriterclass where I define two customized methods to write the entities into the database, see my writer code below:

然后我将这个 DTO 传递给了PropertyExtractingDelegatingItemWriter编写器,在该类中我定义了两个自定义方法将实体写入数据库,请参阅下面的编写器代码:

@Configuration
public class LogWriter extends LogAbstract {
    @Autowired
    private DataSource dataSource;

    @Bean()
    public PropertyExtractingDelegatingItemWriter<DatabaseEntry> itemWriterAccessLogEntry() {
        PropertyExtractingDelegatingItemWriter<DatabaseEntry> propertyExtractingDelegatingItemWriter = new PropertyExtractingDelegatingItemWriter<DatabaseEntry>();
        propertyExtractingDelegatingItemWriter.setFieldsUsedAsTargetMethodArguments(new String[]{"accessLogEntry", "blockedIp"});
        propertyExtractingDelegatingItemWriter.setTargetObject(this);
        propertyExtractingDelegatingItemWriter.setTargetMethod("saveTransaction");
        return propertyExtractingDelegatingItemWriter;
    }

    public void saveTransaction(AccessLogEntry accessLogEntry, BlockedIp blockedIp) throws SQLException {
        writeAccessLogTable(accessLogEntry);
        if (blockedIp != null) {
            writeBlockedIp(blockedIp);
        }

    }

    private void writeBlockedIp(BlockedIp entry) throws SQLException {
        PreparedStatement statement = dataSource.getConnection().prepareStatement("INSERT INTO blocked_ips (ip,threshold,startDate,endDate,comment) VALUES (?,?,?,?,?)");
        statement.setString(1, entry.getIp());
        statement.setInt(2, threshold);
        statement.setTimestamp(3, Timestamp.valueOf(startDate));
        statement.setTimestamp(4, Timestamp.valueOf(endDate));
        statement.setString(5, entry.getComment());
        statement.execute();
    }

    private void writeAccessLogTable(AccessLogEntry entry) throws SQLException {
        PreparedStatement statement = dataSource.getConnection().prepareStatement("INSERT INTO log_entries (date,ip,request,status,userAgent) VALUES (?,?,?,?,?)");
        statement.setTimestamp(1, Timestamp.valueOf(entry.getDate()));
        statement.setString(2, entry.getIp());
        statement.setString(3, entry.getRequest());
        statement.setString(4, entry.getStatus());
        statement.setString(5, entry.getUserAgent());
        statement.execute();
    }
}

With this approach you can get the wanted inital behaviour from a single reader for processing multiple entities and save them in a single step.

使用这种方法,您可以从单个读取器获得所需的初始行为,以处理多个实体并在一个步骤中保存它们。

回答by Sebastien Lorber

You can use a CompositeItemProcessorand CompositeItemWriter

你可以使用一个CompositeItemProcessorCompositeItemWriter

It won't look exactly like your schema, it will be sequential, but it will do the job.

它看起来不像你的模式,它是顺序的,但它会完成这项工作。

回答by danidemi

this is the solution I came up with.

这是我想出的解决方案。

So, the idea is to code a new Writer that "contains" both an ItemProcessor and an ItemWriter. Just to give you an idea, we called it PreprocessoWriter, and that's the core code.

因此,我们的想法是编写一个“包含”一个 ItemProcessor 和一个 ItemWriter 的新 Writer。只是为了给您一个想法,我们将其称为 PreprocessoWriter,这就是核心代码。

private ItemWriter<O> writer;
private ItemProcessor<I, O> processor;

@Override
public void write(List<? extends I> items) throws Exception {
    List<O> toWrite = new ArrayList<O>();
    for (I item : items) {
        toWrite.add(processor.process(item));
    }
    writer.write(toWrite);
}

There's a lot of things being left aside. Management of ItemStream, for instance. But in our particular scenario this was enough.

有很多事情被搁置一边。例如,ItemStream 的管理。但在我们的特定场景中,这已经足够了。

So you can just combine multiple PreprocessorWriter with CompositeWriter.

所以你可以将多个 PreprocessorWriter 与 CompositeWriter 结合起来。

回答by Tristan

There is an other solution if you have a reasonable amount of items (like less than 1 Go) : you can cache the result of your select into a collection wrapped in a Spring bean.

如果您有合理数量的项目(例如少于 1 个 Go),还有另一种解决方案:您可以将选择的结果缓存到封装在 Spring bean 中的集合中。

Then u can just read the collection twice with no cost.

然后您可以免费阅读该系列两次。