Apache Camel is a very powerful and can be used with Java to solve various problems. In this example, I will discuss how it can be used to generate feeds asynchronously. You don't have to write any multi-threading code. Sending email can be done with the MVEL expression language. Following is a simple example where some feed files are processed asynchronously by retrieving the job request from an in-memory queue. In the event of error, email notification is sent. "from" is a consumer, and "to" is the destination. |
Step 1: The dependency jars required defined via Maven pom file.
<properties>
<camel.version>2.10.3</camel.version>
</properties>
<dependencyManagement>
<dependencies>
<!-- CAMEL -->
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-core</artifactId>
<version>${camel.version}</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-spring</artifactId>
<version>${camel.version}</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-test-spring</artifactId>
<version>${camel.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-spring-javaconfig</artifactId>
<version>${camel.version}</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-mvel</artifactId>
<version>${camel.version}</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-bean-validator</artifactId>
<version>${camel.version}</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-beanio</artifactId>
<version>${camel.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
Step 2: The next step is to bootstrap Camel via Spring Java based configuration. Making the config class CamelContextAware will inject the camel context via the setter method.
package com.myapp.config;Step 3: Define the ProducerTemplate within your Service class. The ProducerTemplate interface allows you to send message exchanges to endpoints in a variety of different ways to make it easy to work with Camel Endpoint instances from Java code. This service class could be invoked via a RESTful Webservice.
import com.myapp.camel.JobHandlingRouteBuilder;
import com.myapp.service.MyAppService;
import javax.annotation.Resource;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MyAppConfig implements CamelContextAware
{
private CamelContext context;
@Resource(name = "myAppService")
private MyAppService myAppService;
//constructor injection, where template is the bean id.
@Bean
public ProducerTemplate template()
{
if (context != null)
{
return context.createProducerTemplate();
}
}
@Bean
public RouteBuilder jobHandlingRouteBuilder()
{
return new JobHandlingRouteBuilder(myAppForecastService);
}
@Override
public void setCamelContext(CamelContext camelContext)
{
context = camelContext;
}
@Override
public CamelContext getCamelContext()
{
return context;
}
}
//............
@Service(value = "myAppService")
@Transactional(propagation = Propagation.SUPPORTS)
public class CashForecastServiceImpl implements CashForecastService
{
private ProducerTemplate template;
//since autowired, injected via MyAppConfig template() method with beanId being template.
@Autowired
public void setTemplate(ProducerTemplate template)
{
this.template = template;
}
@Override
public boolean handleGroupLevelFeedGenerationRequest()
{
//a pojo Java class with fields like account code, etc and getter/setter methods
FeedGenerationRequest request = new FeedGenerationRequest();
request.setAccountCode("12345");
//add headers and body. header will be used to determine processing logic by the RouteBuilder
Map<string, object> headers = new HashMap<string,object>();
headers.put(JobHandlingRouteBuilder.JOB_TYPE_HEADER, JobType.FEED_GENERATION);
//send it to an in memory BockingQueue define in the JobHandlingRouteBuilder class
template.sendBodyAndHeaders(JobHandlingRouteBuilder.JOB_QUEUE, request, headers);
return true;
}
public boolean generateFeed1(FeedGenerationRequest request){
//logic to generate feed goes here
}
public boolean generateFeed2(FeedGenerationRequest request){
//logic to generate feed goes here
}
//...........................
}
Step 4: Finally, define the camel route to queue and asynchronously generate the required feed files.
package com.myapp.camel;
import com.myapp.JobType;
import com.myapp.MyAppService;
import org.apache.camel.builder.RouteBuilder;
public class JobHandlingRouteBuilder extends RouteBuilder
{
public static final String JOB_QUEUE = "vm:jobQueue?size=50&timeout=1000000&concurrentConsumers=1";
public static final String FEED_GENERATION_JOB_QUEUE = "vm:feedGenerationJobQueue?size=50&timeout=1000000&concurrentConsumers=1";
public static final String DIRECT_FEED1 = "direct:feed1";
public static final String DIRECT_ERROR = "direct:error";
public static final String DIRECT_FEED2 = "direct:feed2";
public static final String JOB_TYPE_HEADER = "jobTypeHeader";
public static final String CONTROLLABLE_JOB_TYPE_HEADER = "controllableJobTypeHeader";
private MyAppService myAppService;
public JobHandlingRouteBuilder(MyAppService myAppService)
{
super();
this.myAppService = myAppService;
}
@Override
public void configure() throws Exception
{
//build routes
configureJobHandlingRoute();
configureFeedGenerationJobHandlingRoute();
}
/**
* Main route to handling all jobs
*/
private void configureJobHandlingRoute()
{
//from the in memory job queue move it to the in memory feed generation queue
from(JOB_QUEUE)
.routeId(JOB_QUEUE)
.choice()
.when((header(JOB_TYPE_HEADER).isEqualTo(JobType.FEED_GENERATION)))
.log(INFO, "Handling Feed Generation Job")
.to(FEED_GENERATION_JOB_QUEUE);
}
/**
* Route to handle Feed Generation jobs
*/
public void configureFeedGenerationJobHandlingRoute()
{
// @formatter:off
from(FEED_GENERATION_JOB_QUEUE)
.routeId(FEED_GENERATION_JOB_QUEUE)
.multicast() //multiple destinations
.parallelProcessing() //multiple threads
.to(DIRECT_FEED2, DIRECT_FEED1)
.end();
//TODO handle exception and mark the status as FAILED
from(DIRECT_FEED2)
.routeId(DIRECT_FEED2)
.setHeader(CONTROLLABLE_JOB_TYPE_HEADER, simple(ControllableJobType.POS_FEED.toString()))
.doTry()
.bean(myAppService, "generateFeed2") //invokes generateFeed2 method on myAppService bean
.doCatch(Exception.class)
.to(DIRECT_ERROR)
.end();
from(DIRECT_FEED1)
.routeId(DIRECT_FEED1)
.setHeader(CONTROLLABLE_JOB_TYPE_HEADER, simple(ControllableJobType.CASH_FEED.toString()))
.doTry()
.bean(myAppService, "generateFeed1") //invokes generateFeed2 method on myAppService bean
.doCatch(Exception.class)
.to(DIRECT_ERROR)
.end();
//Handle failtures
from(DIRECT_ERROR)
.routeId(DIRECT_ERROR)
.log(INFO, " ${headers.controllableJobTypeHeader} Job failed, reason: ${exception.stacktrace}")
.bean(cashForecastService, "markControllableJobFailed")
.end();
}
}
If you want to send email notification on error, the routes can be enhanced as shown below.
Firstly, add camel mail component.
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-mail</artifactId>
<version>${camel.version}</version>
</dependency>
public static final String DIRECT_EMAIL_NOTIFICATION = "direct:emailNotification";
public static final String NOTIFICATION_FLAG_HEADER = "notificationFlagHeader";
public static final String CREATION_FAILURE_EVENT_SUBJECT = "Failed Feed Generation";
private static final String LOG_URI = "log:" + AbstractCommonRouteBuilder.class.getPackage().getName()
+ "?level=ERROR";
//....
@Override
public void doConfigure()
{
addPropertiesLocation("classpath:cash/cashforecast.properties");
}
//....
from(DIRECT_FEED1)
.routeId(DIRECT_FEED1)
.setHeader(CONTROLLABLE_JOB_TYPE_HEADER, simple(ControllableJobType.CASH_FEED.toString()))
.doTry()
.bean(myAppService, "produceGroupLevelCashFeed")
.doCatch(Exception.class)
.to(DIRECT_ERROR)
.end();
//Handle failtures
from(DIRECT_ERROR)
.routeId(DIRECT_ERROR)
.log(INFO, " ${headers.controllableJobTypeHeader} Job failed, reason: ${exception.stacktrace}")
.bean(myApp, "handleControllableJobFailed")
.to(DIRECT_EMAIL_NOTIFICATION)
.end();
from(DIRECT_EMAIL_NOTIFICATION)
.routeId(DIRECT_EMAIL_NOTIFICATION)
.setHeader(NOTIFICATION_FLAG_HEADER, simple("{{myapp.feed.enable.email.notification}}"))
.choice()
.when(header(NOTIFICATION_FLAG_HEADER).isEqualTo(true))
.log(INFO, "Sending email notification on Failture")
.setBody(simple("${headers.controllableJobTypeHeader} : ${exception.message}"))
.to("smtp:{{myapp.mail.host}}?contentType=text/html&to={{myapp.feed.notification.recipient}}&from={{myapp.feed.notification.sender}}"
+ "&subject="
+ CREATION_FAILURE_EVENT_SUBJECT
+ "&mail.smtp.auth=false&mail.smtp.starttls.enable=false&delete=true&mapMailMessage=false");
//...
/*
* TODO: The better approach would be to use
* BridgePropertyPlaceholderConfigurer so that it picks up @PropertySource
* style configuration
*/
protected void addPropertiesLocation(String... newLocations)
{
PropertiesComponent properties = getPropertiesComponent();
String[] locationsArr = properties.getLocations();
Listlocations = new ArrayList ();
if (locationsArr != null)
{
for (String location : locationsArr)
{
locations.add(location);
}
}
for (String location : newLocations)
{
locations.add(location);
}
locationsArr = new String[locations.size()];
locationsArr = locations.toArray(locationsArr);
properties.setLocations(locationsArr);
}
The route is defined using MVEL, which is a powerful expression language for Java-based applications. You can also appreciate, how easy it is to build your routes using various protocols.