Spring Hibernate Batch Processing in Java with Spring batch -- part 3

This assumes that you have read the spring batch part 1 & part 2. This is the final part.

Step 1: The annotated Java classes are referenced directly due to following line in the batch-context.xml
  
<!-- Component-Scan automatically detects annotations in the Java classes. -->
<context:component-scan base-package="com.myapp.batch" />

Step 2: You define the batch job as shown below. It can have a number of steps. Some details are omitted to keep it simple.
 

<batch:job id="availableBalanceJob">
<batch:listeners>
<!-- annotated listener using context:component-scan-->
<batch:listener ref="appJobExecutionListener" />
<batch:listener ref="itemFailureLoggerListener" />
</batch:listeners>
<batch:step id="step1" parent="batchControlStep">
<batch:end on="FAILED" />
<batch:next on="*" to="step2" />
</batch:step>
<batch:step id="step2">
<batch:tasklet>
<batch:chunk
reader="accountReader"
processor="availableCashProcessor"
writer="availableCashWriter"
commit-interval="100"
retry-limit="1000000">
<!-- deadlock retry -->
<batch:retryable-exception-classes>
<batch:include class="org.springframework.dao.DeadlockLoserDataAccessException"/>
</batch:retryable-exception-classes>
</batch:chunk>
<batch:listeners>
<batch:listener ref="batchItemListener" />
</batch:listeners>
</batch:tasklet>
</batch:step>
</batch:job>

Step 3: Provide the relevant annotated and referenced class implementations. The listener "appJobExecutionListener" is defined as shown below. As you can see the class is annotated with @Component("appJobExecutionListener").
 

package com.myapp.batch.listener;

import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.myapp.batch.dao.BatchControlDao;
import com.myapp.batch.domain.BatchControl;

@Component("appJobExecutionListener")
public class AppJobExecutionListener implements JobExecutionListener {

@Autowired
private BatchControlDao batchControlDao;

public void beforeJob(JobExecution arg0) {
}

public void afterJob(JobExecution jobExecution) {
Long batchControlId = (Long) jobExecution.getExecutionContext().get(BatchControl.JOB_KEY_BATCH_CONTROL_ID);
if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
batchControlDao.saveJobComplete(batchControlId, BatchStatus.COMPLETED);
System.out.println("Job completed: " + jobExecution.getJobInstance().getJobName());
} else if (jobExecution.getStatus() == BatchStatus.FAILED) {
batchControlDao.saveJobComplete(batchControlId, BatchStatus.FAILED);
System.out.println("Job failed: " + jobExecution.getJobInstance().getJobName() + " "
+ jobExecution.getFailureExceptions());
}
}

}

The "itemFailureLoggerListener" can be defined in a similar fashion.
 
package com.myapp.batch.listener;

import org.springframework.batch.core.listener.ItemListenerSupport;
import org.springframework.stereotype.Component;

@Component("itemFailureLoggerListener")
public class ItemFailureLoggerListener extends ItemListenerSupport {


public void onReadError(Exception ex) {
System.out.println("Encountered error on read", ex);
}

public void onWriteError(Exception ex, Object item) {
System.out.println("Encountered error on write", ex);
}
}

The "accountReader" is the ItemReader that reads the relevant account numbers from the accounts table. It basically gets a list of items (i.e. Accounts) to be processed by the processor.
 

<bean id="accountReader" class="org.springframework.batch.item.database.JdbcCursorItemReader">
<property name="dataSource" ref="dataSourceMyDs" />
<property name="sql">
<value>
<![CDATA[
select a.account_no, a.account_name, a.debit, a.credit
from accounts a
where a.account_no > ? and
a.account_no >= ? and
a.account_no <= ?
order by a.account_no
]]>
</value>
</property>
<property name="rowMapper" ref="accountRowMapper" />
<property name="preparedStatementSetter" ref="accountReaderStatementSetter" />
</bean>

The AccountRowMapper is defined as follows to map the results
 

package com.myapp.batch.rowmapper;

import java.sql.ResultSet;
import java.sql.SQLException;

import org.springframework.jdbc.core.RowMapper;
import org.springframework.stereotype.Component;

import com.myapp.batch..domain.Account;

@Component("accountRowMapper")
public class AccountRowMapper implements RowMapper {

public static final String ACC_NO = "account_no";
//...

public Account mapRow(ResultSet rs, int rowNum) throws SQLException {
Account account = new Account();
account.setAccNo(rs.getInteger(ACC_NO));
//.....
return account;
}

}

The where clause arguments can be supplied as shown below by reading from the "jobExecutionContext"
 

<bean id="accountReaderStatementSetter" scope="step" class="org.springframework.batch.core.resource.ListPreparedStatementSetter">
<property name="parameters">
<list>
<value>#{jobExecutionContext['lastProcessedAccNo']}</value>
<value>#{jobExecutionContext['accFrom']}</value>
<value>#{jobExecutionContext['accTo']}</value>
</list>
</property>
</bean>


The availableCashProcessor is the ItemProcessor that calculates the available cash balance. You can delegate this work to a number of processor classes as shown below. It loops through each item (i.e Account)
 

<bean id="availableCashProcessor" class="org.springframework.batch.item.support.CompositeItemProcessor">
<property name="delegates">
<list>
<bean class="com.myapp.batch.processor.CalculateCashValueProcessor" />
</list>
</property>
</bean>

The processor class is shown below
 

package com.myapp.batch.processor;

import java.util.ArrayList;
import java.util.List;

import org.springframework.batch.item.ItemProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;


@Component("availableCashProcessor")
public class CalculateValueProcessor implements ItemProcessor<Account, Account> {


//the account read by the "accountReader" is passed in
public Account process(Account account) {
//simple and dirty without using BigDecimal
account.setAvailableCash(account.getCredit() - account.getDebit());
return account;
}

}

The "availaavailableCashWriter" is the ItemWriter that writes the results.
 

package com.myapp.batch.writer;

import java.util.List;

import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.annotation.BeforeStep;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

//...

@Component("availableCashWriter")
public class AvailableCashWriter implements ItemWriter<Account> {

private StepExecution stepExecution;

@Autowired
private AccountDao accountDao; //injected via spring confih

@Autowired
private ModelAccountPortfolioValueDao modelAccountPortValueDao;

public void write(List<? extends Account> items) throws Exception {
for (Account account : items) {
// Save portfolio value for account
Account readAcc = accountDao.getAccount(account.getAccNo());
readAcc.setAvalableCash(account.getAvailableCash());
accountDao.updateAccount(readAcc);

// Save to step execution context so that it can be promoted to job execution context
ExecutionContext stepContext = this.stepExecution.getExecutionContext();
stepContext.put(BatchControl.JOB_KEY_LAST_PROCESSED_ACC_NO, account.getAccNo());
}
}


@BeforeStep
public void setStepExecution(StepExecution stepExecution) {
this.stepExecution = stepExecution;
}
}

Finally, the batchItemListener that updates the lastProcessedAccNo.
 

package com.myapp.batch.listener;

import java.util.List;

import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.annotation.BeforeStep;
import org.springframework.batch.core.listener.ItemListenerSupport;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.myapp.batch.dao.BatchControlDao;
import com.myapp.batch.domain.BatchControl;

@Component("batchItemListener")
public class ItemBatchListener extends ItemListenerSupport {

private StepExecution stepExecution;

@Autowired
private BatchControlDao batchControlDao;

public void afterWrite(List items) {
Long batchId =
(Long) this.stepExecution.getJobExecution().getExecutionContext().get(
BatchControl.JOB_KEY_BATCH_ID);
String lastProcessedAccNo =
(String) this.stepExecution.getExecutionContext().get(BatchControl.JOB_KEY_LAST_PROCESSED_ACC_NO);
batchControlDao.saveLastProcessedAccNo(lastProcessedAccNo, batchId);
}

@BeforeStep
public void saveStepExecution(StepExecution stepExecution) {
this.stepExecution = stepExecution;
}
}