Apache Karaf Decanter 2.4.0, new processing layer

Up to Apache Karaf Decanter 2.3.0, the collection workflow was pretty simple: collect and append.

In Karaf Decanter 2.4.0, we introduced a new optional layer in between: the processing. It means that the workflow can be now collect, process, and append.

A processor get data from the collectors and do a logic processing before sending the event in the Decanter dispatcher in destination of the appenders.

The purpose is to be able to apply any kind of processing before storing/sending the collected data.

To use and enable this workflow, you just have to install a processor and change the appenders to listen data from the processor.

Example of aggregate processor

A first processor is available in Karaf Decanter 2.4.0: the timed aggregator.

By default, each data collected by collectors is sent directly to the appenders. For instance, it means that the JMX collectors will send one event per MBean every minute by default. If the appender used is a REST appender, it means that we will call the remote REST endpoint every minutes for each MBean, meaning potentially a bunch of calls.

The idea of the aggregate processor is to “group” collected data to reduce the number of calls to the appenders.

Enabling the aggregate processor is simple. For the collectors, nothing changes, you still install the collectors as you use to do.

Then, you install the aggregate processor with the decanter-processor-aggregate feature:

karaf@root()> feature:install decanter-processor-aggregate

This feature installs etc/org.apache.karaf.decanter.processor.aggregate.cfg configuration file allowing you to configure the aggregator:

## Destination dispatcher topics where to send the aggregated events##target.topics=decanter/process/aggregate## Aggregation period in seconds##period=60## Flag defining if properties are appended (and prefixed) in the aggregated event (merge)# or overwritten.# If true, when aggregating, if a property already exists in the aggregator, the value is# overwritten.# If false, when aggregating, any property is prefixed with index and the event contains# all aggregated properties.##overwrite=false

You can see by default, the aggregator groups 60 seconds of collected data and send to decanter/process/aggregate Decanter dispatcher topic (it’s the topic the appenders should listen if they want to append “aggregated” data). The overwrite property define the way of aggregating collected data.

If overwrite is false (default), the collected data properties are prefixed. It means that if the aggregator receives a first collected event containing foo=bar, and a second event containing foo=bar, then, the aggregated event created by the aggregator will be:

0.foo=bar1.foo=bar

If overwrite is true, the collected data properties are overwritten. It means that if the aggregator received a first collected event containing foo=old, and a second event containing foo=bar, then, the aggregated event created by the aggregator will be:

foo=bar

Now, let’s install the log appender (for the example) using the decanter-appender-log feature:

karaf@root()> feature:install decanter-appender-log

By default, the Decanter appenders are listening on decanter/collect/* dispatcher topics. It means it’s directly the data coming from the collectors.

If we want an appender to get data from a processor instead, we have to change the listening topic in the appender configuration.
In our example, we want to get data from the aggregate processor instead of the collectors. In the log appender configuration file (etc/org.apache.karaf.decanter.appender.log.cfg), we just change the events.topics property:

event.topics=decanter/process/*

It’s possible to use a wildcard like here (matching any processor) or a specific topic (from a specific processor).

Now, you will see that, whatever the collected data, you will have a single event per minute coming from the aggregator.

Chaining processing logic

Thanks to the Decanter dispatcher topics, it’s possible to chain processors to create your own workflow.

Like for the appenders, a processor also uses event.topics property to define on which topic the processor is listening.

Basically, it means that a processor listen on event.topics and send on target.topics.

So, you can configure a first processor listening on decanter/collect/* (data coming from the collectors) and sending to decanter/process/first. Then, you can have a second processor listening on decanter/process/first and sending to decanter/process/second, and so on. Finally, at the end of the processing chain, the appender should listen on the topic of the latest processor (for instance decanter/process/last).

It's a very flexible approach allowing you to implement you custom workflow.

Comments

Popular posts from this blog

Quarkus and "meta" extension

Getting started with Apache Karaf Minho

Apache Karaf Minho and OpenTelemetry