Apache Beam: easily implement backoff policy in your DoFn

In Apache Beam, DoFn is your swiss knife: when you don’t have an existing PTransform or CompositeTransform provided by the SDK, you can create your own function.

DoFn ?

A DoFn applies your logic in each element in the input PCollection and let you populate the elements of an output PCollection. To be included in your pipeline, it’s wrapped in a ParDo PTransform.

For instance, you can transform element using a DoFn:

pipeline.apply("ReadFromJms", JmsIO.read().withConnectionFactory(CF).withQueue("city"))         .apply("TransformJmsRecordAsPojo", ParDo.of(new DoFn<JmsRecord, MyCityPojo>() {                   @ProcessElement                   public void processElement(ProcessContext c) {                       String payload = c.element().getPayload();                       MyCityPojo city = new MyCityPojo(payload);                       c.output(city);                   }          })

We can see here the core method of DoFn: processElement() annotated with @ProcessElement.

If you have to deal with resources, other annotated methods allow you to tweak the DoFn lifecycle:

  1. @Setup is called when the DoFn is created on the worker.
  2. @StartBundle is called when the runner starts a bundle of data. The bundle size depends of the runner.
  3. @FinishBundle is called when all elements in the bundle has been processed.
  4. @Teardown is called when the DoFn is stopped on the worker.

Adding Backoff Policy

Now, in your @ProcessElement method, if an exception occurs, the current bundle is just invalidated and not processed.

So, you might want to implement a retry strategy, replaying the data processing logic.

Apache Beam provides utils allowing you to easily implement this retry policy: it’s what we name backoff policy.

FluentBackoff is a convenient util class where you can define a retry policy with:

  • an initial backoff delay between the retry (incremented at each attempt)
  • a max number of retries

To illustrate this, let’s create a DoFn using this:

public class DoFnWithBackoff extends DoFn<JmsRecord, MyCityPojo> {   private static final FluentBackoff BACKOFF =       FluentBackoff.DEFAULT           .withMaxRetries(5)           .withInitialBackoff(Duration.standardSeconds(5));   @ProcessElement   public void processElement(ProcessContext c) {     Sleeper sleeper = Sleeper.DEFAULT;     BackOff backoff = BACKOFF.backoff();     while (true) {       try {          String payload = c.element().getPayload();          MyCityPojo city = new MyCityPojo(payload); // can raise an exception          c.output(city);       } catch (Exception e) {         if (e instanceOf BusinessException.class) {           // in case of custom business exception, we don't retry           throw new BusinessException();         }         // retrying changing the content if required         if (!BackOffUtils.next(sleeper, backoff)) {           // we retried the max number of times           throw e;         }       }     }   }}

The Apache Beam backoff utils give you complete control and you can implement your own retry policy.

Comments

Popular posts from this blog

Quarkus and "meta" extension

Getting started with Apache Karaf Minho

Apache Karaf Minho and OpenTelemetry