Step 1: Define the batch-context.xml with the JobLauncher and the JobRepository. JobLaunchers are responsible for starting a Job with a given job parameters. The spring provided implementation, SimpleJobLauncher, relies on a TaskExecutor to launch the jobs. If no specific TaskExecutor is set then the spring provided default SyncTaskExecutor is used for testing purpose. A JobRepository implementation requires a set of execution Daos to store its information. The MapJobRepositoryFactoryBean is a FactoryBean that automates the creation of a SimpleJobRepository using non-persistent in-memory DAO implementations. This repository is only really intended for use in testing and rapid prototyping. The JobRegistryBeanPostProcessor registers Job beans with a JobRegistry.
<!-- define the job repository -->Step 2: Define the steps necessary for the batch job. Firstly, define the batch control step that will update the batch_control table accordingly.
<bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
<property name="transactionManager" ref="transactionManager" />
</bean>
<!-- - define the launcher and pass the jobRepository as setter injection->
<bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
<property name="jobRepository" ref="jobRepository" />
</bean>
<bean id="jobRegistry" class="org.springframework.batch.core.configuration.support.MapJobRegistry" />
<bean id="jobRegistryBeanPostProcessor" class="org.springframework.batch.core.configuration.support.JobRegistryBeanPostProcessor">
<property name="jobRegistry" ref="jobRegistry" />
</bean>
Step 3: Define the relevant Java classes that are rereferenced in the above configuration file. Capture the batch control meta data as shown below.
<!-- Component-Scan automatically detects annotations in the Java classes. -->
<context:component-scan base-package="com.myapp.batch" />
<batch:step id="batchControlStep">
<batch:tasklet>
<batch:chunk reader="batchControlReader" writer="jobStartBatchControlWriter" commit-interval="1" />
<batch:listeners>
<batch:listener ref="promotionListener" />
<batch:listener ref="batchReaderListener" />
</batch:listeners>
</batch:tasklet>
</batch:step>
<bean id="batchControlReader" class="org.springframework.batch.item.database.JdbcCursorItemReader">
<property name="dataSource" ref="dataSourceMyDs" />
<property name="sql">
<util:constant static-field="com.myapp.batch.dao.impl.BatchControlDaoImpl.SELECT_BY_JOB_NAME_SQL" />
</property>
<property name="rowMapper" ref="batchControlRowMapper" />
<property name="preparedStatementSetter" ref="batchControlReaderStatementSetter" />
</bean>
<bean id="batchControlReaderStatementSetter" scope="step" class="org.springframework.batch.core.resource.ListPreparedStatementSetter">
<!-- The parameter that is passed via command line: my_job_run.sh accountValueUpdateJob1 accountValueUpdateJob1.log -->
<!-- $JAVA_HOME/bin/java -classpath ${CLASSPATH} ${JOB_CLASS} batch-context.xml availableBalanceJob jobName=accountValueUpdateJob1" -->
<property name="parameters">
<list>
<value>#{jobParameters['jobName']}</value>
</list>
</property>
</bean>
<!-- promotes the values read via static constants from one step to another step via the StepExecutionContext -->
<bean id="promotionListener" class="org.springframework.batch.core.listener.ExecutionContextPromotionListener">
<property name="keys">
<list>
<util:constant static-field="com.myapp.batch.domain.BatchControl.JOB_KEY_LAST_PROCESSED_ACC_NO" />
<util:constant static-field="com.myapp.batch.domain.BatchControl.JOB_KEY_BATCH_ID" />
<util:constant static-field="com.myapp.batch.domain.BatchControl.ACC_FROM" />
<util:constant static-field="com.myapp.batch.domain.BatchControl.ACC_TO" />
</list>
</property>
</bean>
<!-- The listener class that has the afterStep() method to print -- >
<bean id="batchReaderListener" class="com.myapp.batch.listener.BatchReaderListener" />
public class BatchControl {
public static final String JOB_KEY_LAST_PROCESSED_ACC_NO = "lastProcessedAccNo";
public static final String JOB_KEY_BATCH_ID = "batchId";
public static final String ACC_FROM = "accFrom";
public static final String ACC_TO = "accTo";
public static final String BATCH_USER = "batch";
private Long jobId;
private String jobName;
private DateTime startDateTime;
private DateTime endDateTime;
private String status;
private Integer accFrom;
private Integer accTo;
private Integer lastProcessedAccNo;
//getters and setters are omitted
}
The listener class that prints the job status -- "COMPLETED" or "FAILED" after the step execution.
package com.myapp.batch.listener;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.step.NoWorkFoundStepExecutionListener;
public class BatchReaderListener extends NoWorkFoundStepExecutionListener {
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
ExitStatus exitStatus = super.afterStep(stepExecution);
if (exitStatus != null && exitStatus.equals(ExitStatus.FAILED)) {
System.out.println("Could not load batch control record with job name "
+ stepExecution.getJobExecution().getJobInstance().getJobParameters().getString("jobName"));
}
return exitStatus;
}
}
The Dao object that reads and updates the batch_control table.
package com.myapp.batch.dao.impl;This will be concluded in part-3.
import org.joda.time.DateTime;
import org.springframework.batch.core.BatchStatus;
import org.springframework.jdbc.core.simple.ParameterizedBeanPropertyRowMapper;
import org.springframework.jdbc.core.simple.SimpleJdbcTemplate;
import org.springframework.orm.hibernate3.HibernateTemplate;
import org.springframework.orm.hibernate3.SessionFactoryUtils;
import org.springframework.orm.hibernate3.support.HibernateDaoSupport;
import com.myapp.batch.dao.BatchControlDao;
import com.myapp.batch.domain.BatchControl;
public class BatchControlDaoImpl extends HibernateDaoSupport implements BatchControlDao {
public static final String SELECT_BY_JOB_NAME_SQL =
"select job_id, job_name, start_timestamp, end_timestamp, status,"
+ " account_no_from, account_no_to, last_account_no from batch_control where job_name = ?";
private static final String JOB_START_UPDATE_SQL =
"update batch_control set start_timestamp = ?, end_timestamp = ?, status = ?, last_account_no=?"
+ " where batch_id = ?";
private static final String JOB_COMPLETE_UPDATE_SQL =
"update batch_control set end_timestamp = ?, status = ? where batch_id = ?";
private static final String LAST_PROCESSED_ACC_UPDATE_SQL =
"update batch_control set last_account_no = ? where batch_id = ?";
private SimpleJdbcTemplate simpleJdbcTemplate;
public BatchControlDaoImpl(HibernateTemplate hibernateTemplate) {
setHibernateTemplate(hibernateTemplate);
this.simpleJdbcTemplate =
new SimpleJdbcTemplate(SessionFactoryUtils.getDataSource(hibernateTemplate.getSessionFactory()));
}
public void init(Long Id, Integer last_account_no) {
simpleJdbcTemplate.update(JOB_START_UPDATE_SQL, new DateTime().toDate(), null, BatchStatus.STARTED.name(),
last_account_no, batchId);
}
public void saveJobComplete(Long batchId, BatchStatus status) {
simpleJdbcTemplate.update(JOB_COMPLETE_UPDATE_SQL, new DateTime().toDate(), status.name(), batchId);
}
public void saveLastProcessedId(String last_account_no, Long batchId) {
simpleJdbcTemplate.update(LAST_PROCESSED_ACC_UPDATE_SQL, last_account_no, batchId);
}
public BatchControl getBatchControl(String jobName) {
BatchControl batchControl =
simpleJdbcTemplate.queryForObject(SELECT_BY_JOB_NAME_SQL, ParameterizedBeanPropertyRowMapper
.newInstance(BatchControl.class), jobName);
return batchControl;
}
}