Apache Beam in action: same code, several execution engines

If the previous article was an introduction to Apache Beam, it’s now time to see some of the key provided features. It’s the perfect timing as Apache Beam 0.2.0-incubating has just been released.

This articles will show a first pipeline use case, and will execute the same pipeline code on different execution engines.

Context: GDEL analyses

For this article, we are going to create a pipeline to analyse GDELT data and count the number of events per location in the world.

The GDELT project gathers all events happening in the world. It creates daily CSV files, containing one line per event.

For instance, an event look like:

545037848       20150530        201505  2015    2015.4110                                                                                       JPN     TOKYO   JPN                                                             1       046     046     04      1       7.0     15      1       15      -1.06163552535792       0                                                       4       Tokyo, Tokyo, Japan     JA      JA40    35.685  139.751 -246227 4       Tokyo, Tokyo, Japan     JA      JA40    35.685  139.751 -246227 20160529        http://deadline.com/print-article/1201764227/

The purpose here is to extract each event (each line), extract the location code (JPN), and group the events per location. Then, we will be able to simply count the number of events per location.

The location of the CSV file should be an option of the pipeline.

First, we create a “wrapper” class. It will contain the pipeline options definition, and a main method running the pipeline:

public class EventsByLocation {}

First, let’s create an inner interface to describe the pipeline options:

    private interface Options extends PipelineOptions {        String GDELT_EVENTS_URL = "http://data.gdeltproject.org/events/";        @Description("GDELT file date")        @Default.InstanceFactory(GDELTFileFactory.class)        String getDate();        void setDate(String value);        @Description("Input Path")        String getInput();        void setInput(String value);        @Description("Output Path")        String getOutput();        void setOutput(String value);        class GDELTFileFactory implements DefaultValueFactory<String> {            public String create(PipelineOptions options) {                SimpleDateFormat format = new SimpleDateFormat("yyyyMMdd");                return format.format(new Date());            }        }    }

Our Options interface simply extends PipelineOptions. We use annotations provided by Beam to describe the option (description of the option, defaut value, …).

Now, we create the main method that will create the actual pipeline and run it:

   public static void main(String[] args) throws Exception {   }

First, we load the pipeline options using our options interface:

        Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);        if (options.getInput() == null) {            options.setInput(Options.GDELT_EVENTS_URL + options.getDate() + ".export.CSV.zip");        }        if (options.getOutput() == null) {            options.setOutput("/tmp/gdelt-" + options.getDate());        }

We can now create the pipeline using these options:

Pipeline pipeline = Pipeline.create(options);

Let’s now create the different steps in our pipline.

The first step is a source: we read the GDELT CSV file using the TextIO.

.apply("GDELTFile", TextIO.Read.from(options.getInput()))

TextIO gives us a PCollection of lines (String) contained in the file.

We now add a second step: it’s a function to parse and split the line to extract the location.

                .apply("ExtractLocation", ParDo.of(new DoFn<String, String>() {                    @ProcessElement                    public void processElement(ProcessContext c) {                        String[] fields = c.element().split("\\t+");                        if (fields.length > 22) {                            if (fields[21].length() > 2) {                                c.output(fields[21].substring(0, 1));                            } else {                                c.output(fields[21]);                            }                        } else {                            c.output("NA");                        }                    }                }))

We now have a PCollection of locations. Before counting, we do a cleanup and filtering of not well formed locations:

                .apply("Filtering", Filter.by(new SerializableFunction<String, Boolean>() {                    public Boolean apply(String input) {                        if (input.equals("NA")) {                            return false;                        }                        if (input.startsWith("-")) {                            return false;                        }                        if (input.length() != 2) {                            return false;                        }                        return true;                    }                }))

We can now count the locations, simply counting the same location elements in the PCollection:

.apply("CountPerLocation", Count.<String>perElement())

We now have a PCollection of location and count. The next step formats the PCollection as PCollection of String using “location: count” format:

                .apply("StringFormat", MapElements.via(new SimpleFunction<KV<String, Long>, String>() {                    public String apply(KV<String, Long> input) {                        return input.getKey() + ": " + input.getValue();                    }                }))

The final step writes the result in an output file:

.apply("Results", TextIO.Write.to(options.getOutput()));

Our pipeline is ready to be run:

pipeline.run();

Finally, our class looks like:

package org.apache.beam.samples;import org.apache.beam.sdk.Pipeline;import org.apache.beam.sdk.io.TextIO;import org.apache.beam.sdk.options.*;import org.apache.beam.sdk.transforms.*;import org.apache.beam.sdk.values.KV;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.text.SimpleDateFormat;import java.util.Date;public class EventsByLocation {    private static final Logger LOG = LoggerFactory.getLogger(EventsByLocation.class);    /**     * Specific pipeline options.     */    private interface Options extends PipelineOptions {        String GDELT_EVENTS_URL = "http://data.gdeltproject.org/events/";        @Description("GDELT file date")        @Default.InstanceFactory(GDELTFileFactory.class)        String getDate();        void setDate(String value);        @Description("Input Path")        String getInput();        void setInput(String value);        @Description("Output Path")        String getOutput();        void setOutput(String value);        class GDELTFileFactory implements DefaultValueFactory<String> {            public String create(PipelineOptions options) {                SimpleDateFormat format = new SimpleDateFormat("yyyyMMdd");                return format.format(new Date());            }        }    }    public static void main(String[] args) throws Exception {        Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);        if (options.getInput() == null) {            options.setInput(Options.GDELT_EVENTS_URL + options.getDate() + ".export.CSV.zip");        }        if (options.getOutput() == null) {            options.setOutput("/tmp/gdelt-" + options.getDate());        }        LOG.info(options.toString());        Pipeline pipeline = Pipeline.create(options);        pipeline                .apply("GDELTFile", TextIO.Read.from(options.getInput()))                .apply("ExtractLocation", ParDo.of(new DoFn<String, String>() {                    @ProcessElement                    public void processElement(ProcessContext c) {                        String[] fields = c.element().split("\\t+");                        if (fields.length > 22) {                            if (fields[21].length() > 2) {                                c.output(fields[21].substring(0, 1));                            } else {                                c.output(fields[21]);                            }                        } else {                            c.output("NA");                        }                    }                }))                .apply("Filtering", Filter.by(new SerializableFunction<String, Boolean>() {                    public Boolean apply(String input) {                        if (input.equals("NA")) {                            return false;                        }                        if (input.startsWith("-")) {                            return false;                        }                        if (input.length() != 2) {                            return false;                        }                        return true;                    }                }))                .apply("CountPerLocation", Count.<String>perElement())                .apply("StringFormat", MapElements.via(new SimpleFunction<KV<String, Long>, String>() {                    public String apply(KV<String, Long> input) {                        return input.getKey() + ": " + input.getValue();                    }                }))                .apply("Results", TextIO.Write.to(options.getOutput()));        pipeline.run();    }}

Execution engines abstraction

In order to build and execute our pipeline, we package our class in a Maven project.

This Maven project is pretty simple as we need only two dependencies:

  • the Apache Beam Java SDK itself
  • the SLF4J dependency as we use a logger
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">    <modelVersion>4.0.0</modelVersion>    <groupId>org.apache.beam.samples</groupId>    <artifactId>EventsByLocation</artifactId>    <packaging>jar</packaging>    <dependencies>        <dependency>            <groupId>org.apache.beam</groupId>            <artifactId>beam-sdks-java-core</artifactId>            <version>0.2.0-incubating</version>        </dependency>        <dependency>            <groupId>org.slf4j</groupId>            <artifactId>slf4j-simple</artifactId>            <version>1.7.13</version>        </dependency>    </dependencies></project>

Note that we didn’t define anything related to the runner.

Here, we want to show one Beam feature: the same code running on different execution runtimes.

So, for that, we will use Maven profiles: each profile will define a specific runner dependencies set. Then, we will be able to execute our pipeline (exactly the same code) on a target runner, just using specifying a JVM argument to identify the runner.

Direct runner

Let’s start with the Direct runner. This is the preferred runner to use for test: it uses several thread in the JVM.

It’s pretty easy to use as it just requires a dependency. So, we create a Maven profile with the Direct runner dependency:

        <profile>            <id>direct-runner</id>            <activation>                <activeByDefault>true</activeByDefault>            </activation>            <dependencies>                <dependency>                    <groupId>org.apache.beam</groupId>                    <artifactId>beam-runners-direct-java</artifactId>                    <version>0.2.0-incubating</version>                </dependency>            </dependencies>        </profile>

We can now run our pipeline on this runner. For that, we use our direct-runner profile and use --runner=DirectRunner JVM argument:

mvn compile exec:java -Dexec.mainClass=org.apache.beam.samples.EventsByLocation -Pdirect-runner -Dexec.args="--runner=DirectRunner --input=/home/dataset/gdelt/2014-2016/201605*.zip --output=/tmp/gdelt/output/"

Spark runner

The Spark runner requires more dependencies (due to Apache Spark runtime). So, again, we create a Maven profile to easily define the dependencies: the Spark engine itself and the required Spark dependencies.

        <profile>            <id>spark-runner</id>            <dependencies>                <dependency>                    <groupId>org.apache.beam</groupId>                    <artifactId>beam-runners-spark</artifactId>                    <version>0.2.0-incubating</version>                </dependency>                <dependency>                    <groupId>org.apache.spark</groupId>                    <artifactId>spark-core_2.10</artifactId>                    <version>1.6.2</version>                </dependency>                <dependency>                    <groupId>org.apache.spark</groupId>                    <artifactId>spark-streaming_2.10</artifactId>                    <version>1.6.2</version>                </dependency>                <dependency>                    <groupId>com.fasterxml.jackson.core</groupId>                    <artifactId>jackson-core</artifactId>                    <version>2.7.2</version>                </dependency>                <dependency>                    <groupId>com.fasterxml.jackson.core</groupId>                    <artifactId>jackson-annotations</artifactId>                    <version>2.7.2</version>                </dependency>                <dependency>                    <groupId>com.fasterxml.jackson.core</groupId>                    <artifactId>jackson-databind</artifactId>                    <version>2.7.2</version>                </dependency>                <dependency>                    <groupId>com.fasterxml.jackson.module</groupId>                    <artifactId>jackson-module-scala_2.10</artifactId>                    <version>2.7.2</version>                </dependency>            </dependencies>        </profile>

Like for the Direct runner, we can directly use the spark-runner profile and the --runner=SparkRunner JVM argument to execute our pipeline on Spark. Basically, it performs the equivalent of a spark-submit:

mvn compile exec:java -Dexec.mainClass=org.apache.beam.samples.EventsByLocation -Pspark-runner -Dexec.args="--runner=SparkRunner --input=/home/dataset/gdelt/2014-2016/201605*.zip --output=/tmp/gdelt/output/"

Flink runner

The Flink runner is easier to use than the Spark runner as it’s packaged as a shaded jar embedded all required dependencies (including Flink itself).

So basically, the flink-runner profile just contains a dependency:

        <profile>            <id>flink-runner</id>            <dependencies>                <dependency>                    <groupId>org.apache.beam</groupId>                    <artifactId>beam-runners-flink_2.10</artifactId>                    <version>0.2.0-incubating</version>                </dependency>            </dependencies>        </profile>

As before, we can run our pipeline on Flink cluster using the flink-runner profile and the --runner=FlinkRunner JVM argument:

mvn compile exec:java -Dexec.mainClass=org.apache.beam.samples.EventsByLocation -Pflink-runner -Dexec.args="--runner=FlinkRunner --input=/home/dataset/gdelt/2014-2016/201605*.zip --output=/tmp/gdelt/output/"

Google Dataflow runner

Finally, we can run our pipeline on Google Cloud Dataflow platform, leveraging features provided by the platform (like the dynamic scalability for instance).

The Google Cloud Dataflow runner is easy to use as, like for the Flink runner, all is packaged in a shaded jar. So the google-cloud-dataflow-runner profile just defines one dependency:

        <profile>            <id>google-cloud-dataflow-runner</id>            <dependencies>                <dependency>                    <groupId>org.apache.beam</groupId>                    <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>                    <version>0.2.0-incubating</version>                </dependency>            </dependencies>        </profile>

As usual, using the google-cloud-dataflow-runner profile and the --runner=DataflowRunner JVM argument, our pipeline will be executed on the Google Cloud Dataflow platform:

mvn compile exec:java -Dexec.mainClass=org.apache.beam.samples.EventsByLocation -Pflink-runner -Dexec.args="--runner=DataflowRunner --input=/home/dataset/gdelt/2014-2016/201605*.zip --output=/tmp/gdelt/output/"

Conclusion

In this article, we saw how to execute exactly the same code (no change at all in the pipeline definition) on different execution engines. We can easily switch from an engine to another just changing the profile and runner.

In a next article, we will take a deeper look on the Beam IOs: the concepts (sources, sinks, watermark, split, …) and how to use and write a custom IO.

Comments

Popular posts from this blog

Quarkus and "meta" extension

Getting started with Apache Karaf Minho

Apache Karaf Minho and OpenTelemetry