Apache Beam: easily implement backoff policy in your DoFn
In Apache Beam, DoFn
is your swiss knife: when you don’t have an existing PTransform
or CompositeTransform
provided by the SDK, you can create your own function.
DoFn ?
A DoFn
applies your logic in each element in the input PCollection
and let you populate the elements of an output PCollection
. To be included in your pipeline, it’s wrapped in a ParDo
PTransform
.
For instance, you can transform element using a DoFn
:
pipeline.apply("ReadFromJms", JmsIO.read().withConnectionFactory(CF).withQueue("city")) .apply("TransformJmsRecordAsPojo", ParDo.of(new DoFn<JmsRecord, MyCityPojo>() { @ProcessElement public void processElement(ProcessContext c) { String payload = c.element().getPayload(); MyCityPojo city = new MyCityPojo(payload); c.output(city); } })
We can see here the core method of DoFn
: processElement()
annotated with @ProcessElement
.
If you have to deal with resources, other annotated methods allow you to tweak the DoFn
lifecycle:
@Setup
is called when theDoFn
is created on the worker.@StartBundle
is called when the runner starts a bundle of data. The bundle size depends of the runner.@FinishBundle
is called when all elements in the bundle has been processed.@Teardown
is called when theDoFn
is stopped on the worker.
Adding Backoff Policy
Now, in your @ProcessElement
method, if an exception occurs, the current bundle is just invalidated and not processed.
So, you might want to implement a retry strategy, replaying the data processing logic.
Apache Beam provides utils allowing you to easily implement this retry policy: it’s what we name backoff policy.
FluentBackoff
is a convenient util class where you can define a retry policy with:
- an initial backoff delay between the retry (incremented at each attempt)
- a max number of retries
To illustrate this, let’s create a DoFn
using this:
public class DoFnWithBackoff extends DoFn<JmsRecord, MyCityPojo> { private static final FluentBackoff BACKOFF = FluentBackoff.DEFAULT .withMaxRetries(5) .withInitialBackoff(Duration.standardSeconds(5)); @ProcessElement public void processElement(ProcessContext c) { Sleeper sleeper = Sleeper.DEFAULT; BackOff backoff = BACKOFF.backoff(); while (true) { try { String payload = c.element().getPayload(); MyCityPojo city = new MyCityPojo(payload); // can raise an exception c.output(city); } catch (Exception e) { if (e instanceOf BusinessException.class) { // in case of custom business exception, we don't retry throw new BusinessException(); } // retrying changing the content if required if (!BackOffUtils.next(sleeper, backoff)) { // we retried the max number of times throw e; } } } }}
The Apache Beam backoff utils give you complete control and you can implement your own retry policy.
Comments
Post a Comment