My previous 3 part spring batch tutorial covered a high level overview with examples. This tutorial demonstrates how to wrap your own File Reader with the FileItemReader to peek the data and group them the way you wanted to provide some customization. For example, if you have a CSV file like shown below where
|
"Portfolio1","29/02/2012","11/03/2012",
"Portfolio1","Account1","OPENBAL", 2000.00
"Portfolio1 ","Account1","PURCHASE",1000.00
"Portfolio1 ","Account1","EXPENSE",500.00
"Portfolio1 ","Account1","ADJUSTMENT ", 200.00
"Portfolio1","Account1","OPENBAL ", 12000.00
"Portfolio1 ","Account2","PURCHASE",1000.00
"Portfolio1 ","Account3","ADJUSTMENT",1000.00
So, wee need to write a custom file reader that can peek into the next record before reading it.
Step 1: Snippets of the spring batch context configuration file.E..g.applicationContext-myapp.xml.
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:batch="http://www.springframework.org/schema/batch"
xmlns:task="http://www.springframework.org/schema/task" xmlns:context="http://www.springframework.org/schema/context"
xmlns:tx="http://www.springframework.org/schema/tx" xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:file="http://www.springframework.org/schema/integration/file"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.0.xsd
http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-2.0.xsd
http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file-2.0.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-2.0.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd">
<!-- load properties file-->
<context:property-placeholder location="classpath:myapp.properties" />
<!-- annotation driven injection -->
<tx:annotation-driven />
<!-- define the job that reads from a CSV file and write to a database-->
<job id="myAppJob" xmlns="http://www.springframework.org/schema/batch">
<listeners>
<listener ref="myAppJobExecutionListener" />
</listeners>
<step id="loadMyAppFeedData">
<tasklet transaction-manager="transactionManager">
<listeners>
<listener ref="stepExecutionListener" />
</listeners>
<chunk reader="groupMyAppDetailsReader" writer="myAppFileItemWriter" commit-interval="10" />
</tasklet>
</step>
</job>
<!-- Spring supplied File Item Reader that reads CSV file line by line-->
<bean id="myAppFileItemReader" class="org.springframework.batch.item.file.FlatFileItemReader" scope="step">
<property name="resource" value="#{jobParameters['dataFileName']}" />
<property name="lineMapper">
<bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
<property name="lineTokenizer">
<bean
class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
<property name="names"
value="portfolioCd,accountCd,transactionType, Amount" />
</bean>
</property>
<property name="fieldSetMapper">
<bean
class="com.myapp.mapper.MyAppFieldSetMapper" />
</property>
</bean>
</property>
<property name="linesToSkip" value="1" />
<property name="skippedLinesCallback" ref="myAppFileHeaderLineCallbackHandler" />
</bean>
<!-- My custom CSV file Reader that groups data but it internally makes use of the Spring's FileItemReader-->
<bean id="groupMyAppDetailsReader" class="com.myapp.item.reader.myAppItemReader">
<property name="delegate" ref="myAppFileItemReader" />
</bean>
<!-- My custome File Item Writer -->
<bean id="myAppFileItemWriter" class="com.myapp.item.writer.MyAppItemWriter" />
<!-- The Step execution context listener that can be injected to propagate step values -->
<bean id="stepExecutionListener" class="com.myapp.StepExecutionListenerCtxInjecter" />
</beans>
Step 2: The custom reader can be implemented as shown below. The key here is that peeking the next record to enable grouping and making use of the Spring provided FileItemReader as a delegate to read each CSV line.
package com.myapp.item.reader;
import java.util.ArrayList;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.ItemStreamReader;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;
import com.myapp.model.TransactionDetail;
import com.myapp.model.MyAppPortfolioParent;
public class MyAppFileItemReader implements ItemStreamReader<MyAppPortfolioParent> {
private ItemStreamReader<TransactionDetail> delegate;
private TransactionDetail curItem = null;
@Override
public MyAppPortfolioParent read() {
MyAppPortfolioParent parent = null;
try {
if (curItem == null) {
curItem = delegate.read();
}
if (curItem != null) {
parent = new MyAppPortfolioParent();
parent.setBalanceDetail(curItem);
}
curItem = null;
if (parent != null) {
parent.setTxnDetails(new ArrayList<TransactionDetail>());
TransactionDetail detail = peek();
while (detail != null && !"OPENBAL".equalsIgnoreCase(peek().getTxnCd())) {
parent.getTxnDetails().add(curItem);
curItem = null;
detail = peek();
}
}
}
catch (Exception e) {
e.printStackTrace();
}
return parent;
}
public TransactionDetail peek() throws Exception, UnexpectedInputException, ParseException {
if (curItem == null) {
curItem = delegate.read();
}
return curItem;
}
@Override
public void close() throws ItemStreamException {
delegate.close();
}
@Override
public void open(ExecutionContext arg0) throws ItemStreamException {
delegate.open(arg0);
}
@Override
public void update(ExecutionContext arg0) throws ItemStreamException {
delegate.update(arg0);
}
public void setDelegate(ItemStreamReader<TransactionDetail> delegate) {
this.delegate = delegate;
}
}
Step 3: The utility class that can be used to inject the step and job execution contexts into your reader, processor, or writer classes.
package com.myapp.util;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.annotation.BeforeStep;
import org.springframework.batch.item.ExecutionContext;
public class StepExecutionListenerCtxInjecter
{
private ExecutionContext stepExecutionCtx;
private ExecutionContext jobExecutionCtx;
@BeforeStep
public void beforeStep(StepExecution stepExecution)
{
stepExecutionCtx = stepExecution.getExecutionContext();
jobExecutionCtx = stepExecution.getJobExecution().getExecutionContext();
}
public ExecutionContext getStepExecutionCtx()
{
return stepExecutionCtx;
}
public ExecutionContext getJobExecutionCtx()
{
return jobExecutionCtx;
}
}
Step 4: As you could see in the spring config file that we are skipping the first record, which is the header record and defined a LineCallBackHandler to handle the header records. Here is the implementation of this handler.
package com.myapp.handler;
import javax.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.file.LineCallbackHandler;
import org.springframework.stereotype.Component;
import com.myapp.dao.MyAppingDao;
import com.myapp.model.MyAppMeta;
import com.myapp.util.CashforecastingUtil;
import com.myapp.util.StepExecutionListenerCtxInjecter;
@Component(value = "myAppFileHeaderLineCallbackHandler")
public class MyAppFileHeaderCallbackHandler implements LineCallbackHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(MyAppFileHeaderCallbackHandler.class);
public static final String FEED_HEADER_DATA = "feedHeaderData";
@Resource(name = "myappFeedDao")
private MyAppDao myappDao;
@Resource(name = "stepExecutionListener")
private StepExecutionListenerCtxInjecter stepExecutionListener;
@Override
public void handleLine(String headerLine) {
LOGGER.debug("header line: {}", headerLine);
//convert CSV data into
MyAppMeta cfMeta = MyAppUtil.getMyAppMetaFromHeader(headerLine, null);
// logical delete current records
int noOfRecordsLogicallyDeleted = myappDao.logicallyDelete(cfMeta);
LOGGER.info("No of records logically deleted: " + noOfRecordsLogicallyDeleted);
//save it in the job execution context
stepExecutionListener.getJobExecutionCtx().put(FEED_HEADER_DATA, cfMeta);
}
}
Step 5: The FileItemReader has a mapper defined to map each row to an object. We need to define this object that gets invoked when each CSV line item is read to convert each field to an object as shown below.
package com.myapp.mapper;
import java.math.BigDecimal;
import java.text.ParseException;
import org.apache.commons.lang.time.DateUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.FieldSet;
import org.springframework.validation.BindException;
import com.myapp.model.MyAppDetail;
public class MyAppFieldSetMapper implements FieldSetMapper<MyAppDetail> {
private final static Logger logger = LoggerFactory.getLogger(CashForecastFieldSetMapper.class);
@Override
public MyAppDetail mapFieldSet(FieldSet fs) throws BindException {
if (fs == null) {
return null;
}
MyAppDetail detail = new MyAppDetail();
detail.setPortfolioCd(fs.readString("portfolioCd"));
detail.setAccountCd(fs.readString("accountCd"));
detail.setTxnCd(fs.readString("txnCd"));
BigDecimal cashValue = fs.readBigDecimal("cashValue");
detail.setCashValue(cashValue != null ? cashValue : BigDecimal.ZERO);
return detail;
}
}
Step 6: The writer class that is responsible for writing a group of items (i.e. parent and children records) to the database.
package com.myapp.item.writer;
import java.util.List;
import javax.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ItemWriter;
import com.myapp.dao.myappingDao;
import com.myapp.handler.myappFileHeaderCallbackHandler;
import com.myapp.model.myappDetail;
import com.myapp.model.myappMeta;
import com.myapp.model.myappParent;
import com.myapp.util.StepExecutionListenerCtxInjecter;
public class MyAppItemWriter implements ItemWriter<MyAppParent> {
@Resource(name = "stepExecutionListener")
private StepExecutionListenerCtxInjecter stepExecutionListener; // to get the step and job contexts
@Resource(name = "myappFeedDao")
private myappingDao myappDao; //dao class for saving records into database
private final static Logger logger = LoggerFactory.getLogger(MyappItemWriter.class);
@Override
public void write(List portfolioDetails) {
//retrieving previously stored data from the job context
myappMeta pfMeta = (myappMeta) stepExecutionListener.getJobExecutionCtx().get(
MyAppFileHeaderCallbackHandler.FEED_HEADER_DATA);
int batchJobId = -1;
//retrieving previously stored data from the job context
if (stepExecutionListener.getJobExecutionCtx().get("batchJobId") != null) {
batchJobId = stepExecutionListener.getJobExecutionCtx().getInt("batchJobId");
}
pfMeta.setBatchJobId(batchJobId);
try {
for (myappParent cfp : portfolioDetails) {
MyappDetail bd = cfp.getBalanceDetail();
// save cash forcasting balances
int noOfRecords = myappDao.saveMyappBalance(bd, pfMeta);
logger.info("No of cashforcast balance records inserted " + noOfRecords);
int syntheticId = myappDao.getmyappId(bd, pfMeta);
// save myapping transaction records
List<Myappdetail> txnDetails = cfp.getTxnDetails();
for (myappDetail txd : txnDetails) {
myappDao.saveMyappDetail(txd, syntheticId);
}
}
} catch (Exception e) {
logger.error("myappItemWriter error", e);
throw new RuntimeException(e);
}
if (logger.isDebugEnabled()) {
logger.debug("Commiting chunks to the database ...... ");
}
}
}