Step 1: The annotated Java classes are referenced directly due to following line in the batch-context.xml
<!-- Component-Scan automatically detects annotations in the Java classes. -->
<context:component-scan base-package="com.myapp.batch" />
Step 2: You define the batch job as shown below. It can have a number of steps. Some details are omitted to keep it simple.
Step 3: Provide the relevant annotated and referenced class implementations. The listener "appJobExecutionListener" is defined as shown below. As you can see the class is annotated with @Component("appJobExecutionListener").
<batch:job id="availableBalanceJob">
<batch:listeners>
<!-- annotated listener using context:component-scan-->
<batch:listener ref="appJobExecutionListener" />
<batch:listener ref="itemFailureLoggerListener" />
</batch:listeners>
<batch:step id="step1" parent="batchControlStep">
<batch:end on="FAILED" />
<batch:next on="*" to="step2" />
</batch:step>
<batch:step id="step2">
<batch:tasklet>
<batch:chunk
reader="accountReader"
processor="availableCashProcessor"
writer="availableCashWriter"
commit-interval="100"
retry-limit="1000000">
<!-- deadlock retry -->
<batch:retryable-exception-classes>
<batch:include class="org.springframework.dao.DeadlockLoserDataAccessException"/>
</batch:retryable-exception-classes>
</batch:chunk>
<batch:listeners>
<batch:listener ref="batchItemListener" />
</batch:listeners>
</batch:tasklet>
</batch:step>
</batch:job>
The "itemFailureLoggerListener" can be defined in a similar fashion.
package com.myapp.batch.listener;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.myapp.batch.dao.BatchControlDao;
import com.myapp.batch.domain.BatchControl;
@Component("appJobExecutionListener")
public class AppJobExecutionListener implements JobExecutionListener {
@Autowired
private BatchControlDao batchControlDao;
public void beforeJob(JobExecution arg0) {
}
public void afterJob(JobExecution jobExecution) {
Long batchControlId = (Long) jobExecution.getExecutionContext().get(BatchControl.JOB_KEY_BATCH_CONTROL_ID);
if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
batchControlDao.saveJobComplete(batchControlId, BatchStatus.COMPLETED);
System.out.println("Job completed: " + jobExecution.getJobInstance().getJobName());
} else if (jobExecution.getStatus() == BatchStatus.FAILED) {
batchControlDao.saveJobComplete(batchControlId, BatchStatus.FAILED);
System.out.println("Job failed: " + jobExecution.getJobInstance().getJobName() + " "
+ jobExecution.getFailureExceptions());
}
}
}
The "accountReader" is the ItemReader that reads the relevant account numbers from the accounts table. It basically gets a list of items (i.e. Accounts) to be processed by the processor.
package com.myapp.batch.listener;
import org.springframework.batch.core.listener.ItemListenerSupport;
import org.springframework.stereotype.Component;
@Component("itemFailureLoggerListener")
public class ItemFailureLoggerListener extends ItemListenerSupport {
public void onReadError(Exception ex) {
System.out.println("Encountered error on read", ex);
}
public void onWriteError(Exception ex, Object item) {
System.out.println("Encountered error on write", ex);
}
}
The AccountRowMapper is defined as follows to map the results
<bean id="accountReader" class="org.springframework.batch.item.database.JdbcCursorItemReader">
<property name="dataSource" ref="dataSourceMyDs" />
<property name="sql">
<value>
<![CDATA[
select a.account_no, a.account_name, a.debit, a.credit
from accounts a
where a.account_no > ? and
a.account_no >= ? and
a.account_no <= ?
order by a.account_no
]]>
</value>
</property>
<property name="rowMapper" ref="accountRowMapper" />
<property name="preparedStatementSetter" ref="accountReaderStatementSetter" />
</bean>
The where clause arguments can be supplied as shown below by reading from the "jobExecutionContext"
package com.myapp.batch.rowmapper;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.stereotype.Component;
import com.myapp.batch..domain.Account;
@Component("accountRowMapper")
public class AccountRowMapper implements RowMapper {
public static final String ACC_NO = "account_no";
//...
public Account mapRow(ResultSet rs, int rowNum) throws SQLException {
Account account = new Account();
account.setAccNo(rs.getInteger(ACC_NO));
//.....
return account;
}
}
The availableCashProcessor is the ItemProcessor that calculates the available cash balance. You can delegate this work to a number of processor classes as shown below. It loops through each item (i.e Account)
<bean id="accountReaderStatementSetter" scope="step" class="org.springframework.batch.core.resource.ListPreparedStatementSetter">
<property name="parameters">
<list>
<value>#{jobExecutionContext['lastProcessedAccNo']}</value>
<value>#{jobExecutionContext['accFrom']}</value>
<value>#{jobExecutionContext['accTo']}</value>
</list>
</property>
</bean>
The processor class is shown below
<bean id="availableCashProcessor" class="org.springframework.batch.item.support.CompositeItemProcessor">
<property name="delegates">
<list>
<bean class="com.myapp.batch.processor.CalculateCashValueProcessor" />
</list>
</property>
</bean>
The "availaavailableCashWriter" is the ItemWriter that writes the results.
package com.myapp.batch.processor;
import java.util.ArrayList;
import java.util.List;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component("availableCashProcessor")
public class CalculateValueProcessor implements ItemProcessor<Account, Account> {
//the account read by the "accountReader" is passed in
public Account process(Account account) {
//simple and dirty without using BigDecimal
account.setAvailableCash(account.getCredit() - account.getDebit());
return account;
}
}
Finally, the batchItemListener that updates the lastProcessedAccNo.
package com.myapp.batch.writer;
import java.util.List;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.annotation.BeforeStep;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
//...
@Component("availableCashWriter")
public class AvailableCashWriter implements ItemWriter<Account> {
private StepExecution stepExecution;
@Autowired
private AccountDao accountDao; //injected via spring confih
@Autowired
private ModelAccountPortfolioValueDao modelAccountPortValueDao;
public void write(List<? extends Account> items) throws Exception {
for (Account account : items) {
// Save portfolio value for account
Account readAcc = accountDao.getAccount(account.getAccNo());
readAcc.setAvalableCash(account.getAvailableCash());
accountDao.updateAccount(readAcc);
// Save to step execution context so that it can be promoted to job execution context
ExecutionContext stepContext = this.stepExecution.getExecutionContext();
stepContext.put(BatchControl.JOB_KEY_LAST_PROCESSED_ACC_NO, account.getAccNo());
}
}
@BeforeStep
public void setStepExecution(StepExecution stepExecution) {
this.stepExecution = stepExecution;
}
}
package com.myapp.batch.listener;
import java.util.List;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.annotation.BeforeStep;
import org.springframework.batch.core.listener.ItemListenerSupport;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.myapp.batch.dao.BatchControlDao;
import com.myapp.batch.domain.BatchControl;
@Component("batchItemListener")
public class ItemBatchListener extends ItemListenerSupport {
private StepExecution stepExecution;
@Autowired
private BatchControlDao batchControlDao;
public void afterWrite(List items) {
Long batchId =
(Long) this.stepExecution.getJobExecution().getExecutionContext().get(
BatchControl.JOB_KEY_BATCH_ID);
String lastProcessedAccNo =
(String) this.stepExecution.getExecutionContext().get(BatchControl.JOB_KEY_LAST_PROCESSED_ACC_NO);
batchControlDao.saveLastProcessedAccNo(lastProcessedAccNo, batchId);
}
@BeforeStep
public void saveStepExecution(StepExecution stepExecution) {
this.stepExecution = stepExecution;
}
}