Apache Beam: easily implement backoff policy in your DoFn
In Apache Beam, DoFn is your swiss knife: when you don’t have an existing PTransform or CompositeTransform provided by the SDK, you can create your own function.
DoFn ?
A DoFn applies your logic in each element in the input PCollection and let you populate the elements of an output PCollection. To be included in your pipeline, it’s wrapped in a ParDo PTransform.
For instance, you can transform element using a DoFn:
pipeline.apply("ReadFromJms", JmsIO.read().withConnectionFactory(CF).withQueue("city")) .apply("TransformJmsRecordAsPojo", ParDo.of(new DoFn<JmsRecord, MyCityPojo>() { @ProcessElement public void processElement(ProcessContext c) { String payload = c.element().getPayload(); MyCityPojo city = new MyCityPojo(payload); c.output(city); } })We can see here the core method of DoFn: processElement() annotated with @ProcessElement.
If you have to deal with resources, other annotated methods allow you to tweak the DoFn lifecycle:
@Setupis called when theDoFnis created on the worker.@StartBundleis called when the runner starts a bundle of data. The bundle size depends of the runner.@FinishBundleis called when all elements in the bundle has been processed.@Teardownis called when theDoFnis stopped on the worker.
Adding Backoff Policy
Now, in your @ProcessElement method, if an exception occurs, the current bundle is just invalidated and not processed.
So, you might want to implement a retry strategy, replaying the data processing logic.
Apache Beam provides utils allowing you to easily implement this retry policy: it’s what we name backoff policy.
FluentBackoff is a convenient util class where you can define a retry policy with:
- an initial backoff delay between the retry (incremented at each attempt)
- a max number of retries
To illustrate this, let’s create a DoFn using this:
public class DoFnWithBackoff extends DoFn<JmsRecord, MyCityPojo> { private static final FluentBackoff BACKOFF = FluentBackoff.DEFAULT .withMaxRetries(5) .withInitialBackoff(Duration.standardSeconds(5)); @ProcessElement public void processElement(ProcessContext c) { Sleeper sleeper = Sleeper.DEFAULT; BackOff backoff = BACKOFF.backoff(); while (true) { try { String payload = c.element().getPayload(); MyCityPojo city = new MyCityPojo(payload); // can raise an exception c.output(city); } catch (Exception e) { if (e instanceOf BusinessException.class) { // in case of custom business exception, we don't retry throw new BusinessException(); } // retrying changing the content if required if (!BackOffUtils.next(sleeper, backoff)) { // we retried the max number of times throw e; } } } }}The Apache Beam backoff utils give you complete control and you can implement your own retry policy.
Comments
Post a Comment