Use Camel, CXF and Karaf to implement batches

Introduction

Apache Camel has not be designed to be used for implementing batch tasks.

For instance, if your Camel route has a consumer endpoint polling files in a directory, Camel will periodically and indefinitely monitor the folder and poll any new incoming files.
It’s not a batch behavior: in batch mode, we want to run the file polling on demand, at a certain time, launched by a batch scheduler like ControlM, $Universe or Tivoli Worksheet Scheduler.

However, there are several interesting points to use Camel for batch implementation. First, Camel provides a large set of components. A lot of batches read/write files, read from a JMS queues, write into JMS queues, etc. Usage of Camel components in a batch way is really valuable.
Second, Camel uses a DSL to describe the process executed by the routes. Especially, it supports “human readable” DSL like Spring XML or Blueprint XML. It means that it’s easy to review what the batch is doing, eventually change an endpoint definition, etc. Most of the time, batches are “black box”: you run it, and you only get a status code to know if it’s OK or not. With Camel, you have a look on the batch process.
Third, Camel is a highly plug and play framework. It means that it’s easy to replace an endpoint by another one. For instance, if your batch polls files in a folder currently, it’s very easy to change this to poll messages from a JMS queue. You don’t really have to re-implement the whole batch.

More over, tools like Talend ESB Studio provide an IDE to create and design your Camel routes.

In this article, we are going to see how to use Camel in a “batch way”.

Design

In fact, we are going to have two Camel routes:
– the first one is called “control”. This route will “expose” a REST service to start the batch. A bean in this route will be responsible to start the “batch” route.
– the second one is called “batch”. It’s the core implementation of our batch. It’s a “standard” route, but at the end, we have a processor that “stop” the route (to avoid to have the route up indefinitely). This route is not auto started as it will be controller by the first one.

It means that a simple HTTP client (like a browser or REST client) will start the batch, on-demand. Most of enterprise batch schedulers ship a component to make HTTP requests.

POM

Our batch will be packaged as an OSGi bundle. It will allow us to deploy the batch in an Apache Karaf OSGi container:


<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

  <modelVersion>4.0.0</modelVersion>

  <groupId>net.nanthrax.examples</groupId>
  <artifactId>camel-batch</artifactId>
  <version>1.0-SNAPSHOT</version>
  <packaging>bundle</packaging>

  <properties>
    <camel.version>2.8.0</camel.version>
    <cxf.version>2.4.1</cxf.version>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.apache.camel</groupId>
      <artifactId>camel-core</artifactId>
      <version>${camel.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.camel</groupId>
      <artifactId>camel-spring</artifactId>
      <version>${camel.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.camel</groupId>
      <artifactId>camel-cxf</artifactId>
      <version>${camel.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.cxf</groupId>
      <artifactId>cxf-rt-frontend-jaxrs</artifactId>
      <version>${cxf.version}</version>
    </dependency>
    <,dependency>
      <groupId>org.apache.cxf</groupId>
      <artifactId>cxf-rt-transports-http</artifactId>
      <version>${cxf.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.cxf</groupId>
      <artifactId>cxf-rt-transports-http-jetty</artifactId>
      <version>${cxf.version}</version>
    </dependency>
  </dependencies>

  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.felix</groupId>
        <artifactId>maven-bundle-plugin</artifactId>
        <version>2.3.4</version>
        <extensions>true</extensions>
        <configuration>
          <instructions>
            <Bundle-SymbolicName>${project.artifactId}</Bundle-SymbolicName>
            <Require-Bundle>org.apache.cxf.bundle,org.apache.camel.camel-cxf,org.springframework.beans</Require-Bundle>
          </instructions>
        </configuration>
      </plugin>
    </plugins>
  </build>

</project>

In this POM, we can see:
– the packaging is an OSGi bundle. That’s why we use the Apache Felix maven-bundle-plugin. We name the bundle with the project artifactId, and we define Camel and CXF bundles as dependencies (Require-Bundle).
– in the dependency sets, we define the Camel components that we use (camel-core, camel-spring to use the Camel Spring XML DSL, and camel-cxf to use the CXF JAX-RS implementation) and the CXF dependency to be able to create a JAX-RS server.

Control route

The first Camel route is the “control” one. This route will bind a JAX-RS server, listening HTTP requests (consumer) and will start the “batch” route on-demand.

The route definition will be located in the META-INF/spring/routes.xml folder of our bundle. We use the Camel Spring XML DSL in this file:


<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:cxf="http://camel.apache.org/schema/cxf"
  xsi:schemaLocation="
    http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
    http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
    http://camel.apache.org/schema/cxf http://camel.apache.org/schema/cxf/camel-cxf.xsd
  ">

  <cxf:rsServer id="rsServer" address="http://localhost:9090/batch"
serviceClass="net.nanthrax.examples.camel.batch.impl.ControllerService"/&t;

  <camelContext xmlns="http://camel.apache.org/schema/spring">
    <route id="control">
      <from uri="cxfrs:bean:rsServer"/>
      <to uri="log:net.nanthrax.examples.camel.batch"/>
      <to uri="controllerBean"/>
    </route>
  </camelContext>

  <bean id="controllerBean" class="net.nanthrax.examples.camel.batch.impl.ControllerBean">
    <property name="routeId" value="batch"/>
  </bean>

</beans>

We use the Camel CXF to create the JAX-RS server (using <cxf:rsServer/gt;). This JAX-RS server will listen on the local machine on the 9090 port, and the context path is /batch.
To “describe” the REST service behavior, we define the ControllerService class in the serviceClass attribute.
The ControllerService class is just an “empty container”. The purpose is just to “describe” the REST service, not to process it:


package net.nanthrax.examples.camel.batch.impl;

import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;

/**
* REST service implementation of the Camel batch service.
*/
@Path("/")
public class ControllerService {

  @GET
  @Path("/start")
  @Produces("text/plain")
  public String startRoute() throws Exception {
    // nothing to do, it's just a wrapper
    return null;
  }

}

We can see the JAX-RS annotations:
– the ControllerService REST Path is /, it means directly bound to the JAX-RS server context path.
– the startRoute() method will accept GET HTTP method, on the context path /start and it will produce pure text (text/plain).

The process itself will be performed in the controllerBean:


package net.nanthrax.examples.camel.batch.impl;

import org.apache.camel.CamelContext;
import org.apache.camel.Handler;

/**
* Camel controller bean involved in the starting routed
*/
public class ControllerBean {

  private String routeId;

  public String getRouteId() {
    return this.routeId;
  }

  public void setRouteId(String routeId) {
    this.routeId = routeId;
  }

  @Handler
  public String startRoute(CamelContext camelContext) throws Exception {
    camelContext.startRoute(routeId);
    return "Batch " + routeId + " started.";
  }

}

We inject the Camel route ID of the batch route: “batch”. The CamelContext is automatically injected by Camel. This bean is quite simple, as it only starts the “batch” route.

Batch route

This route contains the “batch logic”. You can use any kind of routes, components, Enterprise Integration Patterns, etc provided by Camel. The only specific parts are:
– the autoStartup attribute set to false, to avoid to start the route automatically at context bootstrap
– the final processor which stop the route after processing.

We gather the two routes in the same META-INF/spring/routes.xml file:


<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:cxf="http://camel.apache.org/schema/cxf"
  xsi:schemaLocation="
    http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
    http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
    http://camel.apache.org/schema/cxf http://camel.apache.org/schema/cxf/camel-cxf.xsd
  ">

  <cxf:rsServer id="rsServer" address="http://localhost:9090/batch"
  serviceClass="net.nanthrax.examples.camel.batch.impl.ControllerService"/>

  <camelContext xmlns="http://camel.apache.org/schema/spring">
    <route id="control">
      <from uri="cxfrs:bean:rsServer"/>
      <to uri="log:net.nanthrax.examples.camel.batch"/>
      <to uri="controllerBean"/>
    </route>
    <route id="batch" autoStartup="false">
      <from uri="file:/tmp"/>
      <to uri="file:output"/>
      <process ref="stopProcessor"/>
    </route>
  </camelContext>

  <bean id="controllerBean" class="net.nanthrax.examples.camel.batch.impl.ControllerBean">
    <property name="routeId" value="batch"/>
  </bean>

  <bean id="stopProcessor" class="net.nanthrax.examples.camel.batch.impl.StopProcessor">
    <property name="routeId" value="batch"/>
  </bean>

</beans>

In this example, the batch polls files in the /tmp folder, and copies into the output folder.

The StopProcessor is Camel processor (aka, it implements the Camel Processor interface). It stops the route after processing the incoming message (we inject the “batch” route ID using Spring):


package net.nanthrax.examples.camel.batch.impl;

import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;

/**
* A Camel processor which stop routes.
*/
public class StopProcessor implements Processor {

  private String routeId;

  public String getRouteId() {
    return this.routeId;
  }

  public void setRouteId(String routeId) {
    this.routeId = routeId;
  }

  public void process(Exchange exchange) throws Exception {
    CamelContext camelContext = exchange.getContext();
    // remove myself from the in flight registry so we can stop this route without trouble
    camelContext.getInflightRepository().remove(exchange);
    // stop the route
    camelContext.stopRoute(routeId);
  }

}

Deployment and execution

Now, we can build our OSGi bundle, simply using:


mvn clean install

In a fresh Apache Karaf instance, we have first to install the CXF and Camel features:


karaf@root> features:addurl mvn:org.apache.cxf.karaf/apache-cxf/2.4.1/xml/features
karaf@root> features:install cxf
karaf@root> features:addurl mvn:org.apache.camel.karaf/apache-camel/2.8.0/xml/features
karaf@root> features:install camel-spring
karaf@root> features:install camel-cxf

Now, we can install our bundle:


karaf@root> osgi:install -s mvn:net.nanthrax.examples/camel-batch/1.0-SNAPSHOT

Our bundle appears as “created”:


karaf@root> la|grep -i batch
[ 134] [Active ] [ ] [Started] [ 60] camel-batch (1.0.0.SNAPSHOT)

Using a simple browser, we can access to http://localhost:9090/batch/start. The route is started (as a batch) and we can see in the browser:


Batch batch started.

Conclusion

Even if the first Camel route purpose is to be up and running all the time, we can use it in a more “batch” way. It allows developers to use the large set of Camel components, and be able to use all Enterprise Integration Patterns. For instance, the batch needs to copy a file, and after send an e-mail and a message into a JMS queue, it’s very easy using a recipient list. You have to send to a target endpoint depending of the content of the message, no problem using a Content Based Router.

You can run such kind of batches in Talend ESB. It’s an interesting addition to the Talend Data Integration products (ETL jobs, MDM, DQ, etc).

Comments

Popular posts from this blog

Getting started with Apache Karaf Minho

Exposing Apache Karaf configurations with Apache Arrow Flight

Using Apache Karaf with Kubernetes