java Spring Batch如何在步骤中写入之前处理数据列表

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/26313876/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-11-02 09:41:15  来源:igfitidea点击:

Spring Batch how to process list of data before write in a Step

javaspringbatch-processingspring-batch

提问by Aure77

I am trying to read client data from database and write processed data to a flat file. But I need to process whole result of the ItemReaderbefore write data.

我正在尝试从数据库读取客户端数据并将处理后的数据写入平面文件。但是我需要处理ItemReader之前写入数据的整个结果。

For example, I am reading Client from database rows :

例如,我正在从数据库行读取客户端:

public class Client {
    private String id;
    private String subscriptionCode;
    private Boolean activated;
}

But I want to count and write how many user are activated grouped by subscriptionCode :

但我想统计并写出有多少用户按 subscriptionCode 分组:

public class Subscription {
    private String subscriptionCode;
    private Integer activatedUserCount;
}

I don't know how to perform that using ItemReader/ItemProcessor/ItemWriter, can you help me ?

我不知道如何执行,使用ItemReader/ ItemProcessor/ ItemWriter,你能帮帮我吗?

BatchConfiguration :

批量配置:

@CommonsLog
@Configuration
@EnableBatchProcessing
@EnableAutoConfiguration
public class BatchConfiguration {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public Step step1() {
        return stepBuilderFactory.get("step1")
                .<Client, Client> chunk(1000)
                .reader(new ListItemReader<Client>(new ArrayList<Client>() { // Just for test
                    {
                        add(Client.builder().id("1").subscriptionCode("AA").activated(true).build());
                        add(Client.builder().id("2").subscriptionCode("BB").activated(true).build());
                        add(Client.builder().id("3").subscriptionCode("AA").activated(false).build());
                        add(Client.builder().id("4").subscriptionCode("AA").activated(true).build());
                    }
                }))
                .processor(new ItemProcessor<Client, Client>() {
                    public Client process(Client item) throws Exception {
                        log.info(item);
                        return item;
                    }
                })
                .writer(new ItemWriter<Client>() {
                    public void write(List<? extends Client> items) throws Exception {
                        // Only here I can use List of Client
                        // How can I process this list before to fill Subscription objects ?
                    }
                })
                .build();
    }

    @Bean
    public Job job1(Step step1) throws Exception {
        return jobBuilderFactory.get("job1").incrementer(new RunIdIncrementer()).start(step1).build();
    }
}

Main application:

主要应用:

public class App {
    public static void main(String[] args) throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, JobParametersInvalidException {
        System.exit(SpringApplication.exit(SpringApplication.run(BatchConfiguration.class, args)));
    }
}

采纳答案by Aure77

I found a solution based on ItemProcessor:

我找到了一个基于的解决方案ItemProcessor

@Bean
public Step step1() {
  return stepBuilderFactory.get("step1")
      .<Client, Subscription> chunk(1000)
      .reader(new ListItemReader<Client>(new ArrayList<Client>() {
        {
          add(Client.builder().id("1").subscriptionCode("AA").activated(true).build());
          add(Client.builder().id("2").subscriptionCode("BB").activated(true).build());
          add(Client.builder().id("3").subscriptionCode("AA").activated(false).build());
          add(Client.builder().id("4").subscriptionCode("AA").activated(true).build());
        }
      }))
      .processor(new ItemProcessor<Client, Subscription>() {
        private List<Subscription> subscriptions;

        public Subscription process(Client item) throws Exception {
          for (Subscription s : subscriptions) { // try to retrieve existing element
            if (s.getSubscriptionCode().equals(item.getSubscriptionCode())) { // element found
              if(item.getActivated()) {
                s.getActivatedUserCount().incrementAndGet(); // increment user count
                log.info("Incremented subscription : " + s);
              }                             
              return null; // existing element -> skip
            }
          }
          // Create new Subscription
          Subscription subscription = Subscription.builder().subscriptionCode(item.getSubscriptionCode()).activatedUserCount(new AtomicInteger(1)).build();
          subscriptions.add(subscription);
          log.info("New subscription : " + subscription);
          return subscription;
        }

        @BeforeStep
        public void initList() {
          subscriptions = Collections.synchronizedList(new ArrayList<Subscription>());
        }

        @AfterStep
        public void clearList() {
          subscriptions.clear();
        }
      })
      .writer(new ItemWriter<Subscription>() {                  
        public void write(List<? extends Subscription> items) throws Exception {
          log.info(items);
          // do write stuff
        }                   
      })
      .build();
}

But I have to maintain a second SubscriptionList into ItemProcessor(I don't know if is thread safe and efficient ?). What do you think about this solution ?

但是我必须将第二个SubscriptionList维护到ItemProcessor(我不知道线程是否安全和高效?)。你怎么看这个解决方案?

回答by Luca Basso Ricci

If I understand from your comments you need to made a summary of activated account, right?
You can create a Subscriptionfor every Clientyou are processing and with a ItemWriterLister.afterWritewrite the above created Subscriptions items to database.

如果我从您的评论中了解到您需要对激活的帐户进行汇总,对吗?
您可以SubscriptionClient正在处理的每个项目创建一个,ItemWriterLister.afterWrite并将上述创建Subscription的项目写入数据库。